In [2]:
import chess
import chess.pgn as pgn
import datetime
import os
#from itertools import enumerate
from typing import Iterable, Iterator, Tuple

In [2]:
pgn_path = r"./lichess_db_standard_rated_2024-07.pgn"
def gen_datafile_path():
    return r"./training_data/raw-2024-07--{}.dat".format(str(datetime.datetime.now().strftime("%Y%m%d%H%M%S")))

In [3]:
def games_iter(path) -> Iterator[Tuple[pgn.Game, int]]:
    with open(path, "r") as f:
        count_game = 0
        while (game := pgn.read_game(f)):
            yield game, count_game
            count_game += 1

In [4]:
def games_with_eval_iter(path) -> Iterator[Tuple[pgn.Game, int]]:
    for game, count_game in games_iter(path):
        try:
            if not next(iter(game.mainline())).eval():
                continue
        except Exception:
            continue
        yield game, count_game

In [5]:
def count_pieces(fen: str):
    res = 0
    for c in fen.split(" ")[0]:
        if c.isalpha():
            res += 1
    return res

def dataset_gen(
        it: Iterator[Tuple[pgn.Game, int]], num_output_data = 1000, filter = lambda board: True, output_path = None
        ):
    """
    it: iterator of games.
    filter: decide whether this board is to be accepted (True) or not (False).
    """
    if output_path == None:
        output_path = gen_datafile_path()
    os.system(f"cd.>{output_path}")
    with open(output_path, "r+") as out:
        count = 0
        for game, count_game in it:
            if "FEN" in game.headers:
                board = chess.Board(fen = game.headers["FEN"])
            else:
                board = chess.Board()
            break_flg = False
            for child_node in game.mainline():
                if count > num_output_data:
                    break_flg = True
                    break
                count += 1
                if not filter(child_node):
                    continue

                eval = child_node.eval()
                if eval == None:
                    break
                eval = eval.white().score()

                board.push(child_node.move)
                fen = board.fen()
                
                result = {"0-1": -1, "1-0": 1, "1/2-1/2": 0}[game.headers["Result"]]
                piece_count = count_pieces(fen)
                ply = child_node.ply()
                out.write(f"{fen}\n{eval}\n{result}\n{piece_count}\n{count_game}\n{ply}\n\n")
            if break_flg:
                break

In [6]:
dataset_gen(games_iter(pgn_path), 100000)