In [1]:
import numpy as np
import pandas as pd
import plotly.express as px

from os.path import getsize

## Функции для парсинга

In [2]:
def get_next_analysed_game():
    # Пока не найдем игру с анализом
    while True:
        # Записываем строки игры
        buffer = []
        while True:
            line = f.readline()

            # Пустые строки не нужны
            if line == "\n":
                continue

            buffer.append(line)

            # Ходы идут последней строкой в игре
            # и всегда начинаются с "1."
            if line.startswith("1."):
                break
                
        # Если есть анализ - берем
        if ("%eval" in line):
            return buffer

In [3]:
def params_to_dict(str_list):
    """
    Принимает строки в виде
    [key "value"]
    И переводит их в словарь
    https://en.wikipedia.org/wiki/Portable_Game_Notation
    """
    return {
        a: b.strip('"') 
        for a, b in [
            i.strip("\n").strip("[]").split(" ", 1) 
            for i in str_list
        ]
    }

In [4]:
def moves_to_df(moves):
    s = moves.replace("[", "").replace("]", "")
    s = s.split(" ")
    s = s[:-1]
    
    # Если последний ход - мат, для него нет оценки позиции (игра окончена)
    # Поэтому слайсы сдвигаются, время не в той колонке
    # Чтобы исправить, чуть подправим в конце
    if len(s) % 8 != 0:
        s.append(s[-2])
        s[-3] = "#0"

    df = pd.DataFrame.from_dict({
        "Move": s[1::8],
        "Eval": s[4::8],
        "Clock": s[6::8]
    }, orient="index").transpose()
    # orient нужен, если будут списки разной длины
    
    # Добавим номер хода и сторону
    df["MoveNumber"] = (df.index // 2) + 1
    df["MoveSide"] = (df.index % 2)
    
    # Анализ доступен только для первых 200 ходов 
    # Строчки без него не нужны
    df = df.head(200)

    return df

In [5]:
def get_and_parse_next_analysed_game():
    while True:
        data = get_next_analysed_game()

        game = params_to_dict(data[:-1])
        # Отфильтруем по времени сразу, чтобы не парсить ходы
        if game["TimeControl"].split("+")[0] not in ["600", "900"]:
            continue

        return (
            game, 
            moves_to_df(data[-1])
        )

In [6]:
def get_two_dfs(size=10_000):
    games_list = []
    moves_list = []
    
    for i in range(size):
        
        try:
            a, m = get_and_parse_next_analysed_game()
        except:
            continue

        game_id = a["Site"].split("/")[-1]
        a["GameID"] = game_id
        m["GameID"] = game_id

        games_list.append(a)
        moves_list.append(m)
        
    return (
        pd.DataFrame(games_list),
        pd.concat(moves_list)
    )

## Запускаем парсинг

In [10]:
PGN_FILE = "pgn/lichess_db_standard_rated_2023-03.pgn"
print(f"PGN file size (bytes): {getsize(PGN_FILE):,}")

f = open(PGN_FILE, mode="r")
# OFFSET = 0
# f.seek(OFFSET)

# Прочитаем файл до начала следующей партии
# while True:
#     line = f.readline()
#     if line.startswith("1."):
#         break

PGN file size (bytes): 247,668,460,068


In [None]:
batch_size = 10_000
n_batches = 25

In [None]:
for batch in range(1, n_batches + 1):

    df_games, df_moves = get_two_dfs(batch_size)

    df_games = df_games[[
        "GameID",
        "Result",
        "WhiteElo",
        "BlackElo",
        "WhiteRatingDiff",
        "BlackRatingDiff",
        "ECO",
        "Opening",
        "TimeControl",
        "Termination"
    ]]

    df_games["WhiteElo"] = df_games["WhiteElo"].astype(int)
    df_games["BlackElo"] = df_games["BlackElo"].astype(int)

    df_games = df_games[(
        df_games["WhiteRatingDiff"].notna() & 
        df_games["BlackRatingDiff"].notna()
    )]

    df_games["WhiteRatingDiff"] = df_games["WhiteRatingDiff"].astype(int)
    df_games["BlackRatingDiff"] = df_games["BlackRatingDiff"].astype(int)
    
    # Берем только игры с небольшой разницей в рейтинге
    df_games = df_games[
        (df_games["WhiteRatingDiff"].astype(float).abs() <= 20) &
        (df_games["BlackRatingDiff"].astype(float).abs() <= 20) &
        ((df_games["WhiteElo"] - df_games["BlackElo"]).abs() <= 200)
    ]
    
    # Отфильтруем, чтобы id в df_games и df_moves содержали одинаковые id
    ids = set(df_games["GameID"]) & set(df_moves["GameID"])
    df_games = df_games[ df_games["GameID"].isin(ids) ]
    df_moves = df_moves[ df_moves["GameID"].isin(ids) ]
    # assert set(df_games["GameID"]) == set(df_moves["GameID"])

    df_games.to_parquet(f"parsed/batch_{batch}_games.parquet")
    df_moves.to_parquet(f"parsed/batch_{batch}_moves.parquet")

    print(f"#{batch}")

In [None]:
print(f"Last cursor position (bytes): {f.tell():,}")

In [None]:
# Last cursor position (bytes): 21,612,755,641