In [1]:
import pandas as pd
import chess.pgn
from tqdm import tqdm
from numpy import int32


In [2]:
def load_opening_data() -> pd.DataFrame:
    eco_a = pd.read_csv("files/a.tsv", sep="\t", index_col="epd")
    eco_b = pd.read_csv("files/b.tsv", sep="\t", index_col="epd")
    eco_c = pd.read_csv("files/c.tsv", sep="\t", index_col="epd")
    eco_d = pd.read_csv("files/d.tsv", sep="\t", index_col="epd")
    eco_e = pd.read_csv("files/e.tsv", sep="\t", index_col="epd")
    starting_position = pd.DataFrame.from_dict(
        data={
            "name": ["Start"],
            "epd": ["rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq -"],
            "pgn": None,
            "eco": None,
        },
        orient="columns",
    ).set_index("epd")

    openings = pd.concat([eco_a, eco_b, eco_c, eco_d, eco_e, starting_position]).drop(
        columns=["uci"]
    )

    return openings


In [19]:
def load_games(n_games: int) -> list[chess.pgn.Game]:
    """Load n games from the pgn file and return them as a list"""
    with open("files\lichess_elite_2022-04.pgn") as pgn_file:
        games = []
        for i in tqdm(range(n_games), desc="Loading games"):
            game = chess.pgn.read_game(pgn_file)
            if game is not None:
                games.append(game)
            else:
                break

    return games


def get_positions(games: list[chess.pgn.Game]) -> pd.DataFrame:
    """Get epd positions from the first 15 moves of all games"""
    games_positions = []
    for game in tqdm(games, desc="Extracting positions"):
        positions = []
        main_line = list(game.mainline())
        for i in range(30):
            # Get first 15 Moves
            try:
                move = main_line[i]
                board = move.board()
                positions.append(board.epd())
            except:
                break
        games_positions.append(positions)
    return pd.DataFrame(games_positions)


In [4]:
def get_opening_name(epd: str, openings) -> str:
    """Return opening name from epd, if exists."""
    if epd in openings.index:
        return openings.loc[epd, "name"]
    else:
        return None


def fill_adjacency_matrix(
    positions: pd.DataFrame, openings: pd.DataFrame, adjacency_matrix: pd.DataFrame
) -> pd.DataFrame:
    """Iterate over all moves in all games. If a transposition of named openings is found, add 1 to the adjacency matrix between the two openings"""
    for game in tqdm(range(positions.shape[0]), desc="Analyzing games"):
        last_opening_name = "Start"
        for ply in range(positions.shape[1]):
            epd = positions.iloc[game, ply]
            new_opening_name = get_opening_name(epd, openings)
            if new_opening_name != None and new_opening_name != last_opening_name:
                adjacency_matrix.loc[last_opening_name, new_opening_name] += 1
                last_opening_name = new_opening_name

    adjacency_matrix = remove_non_connected_nodes(adjacency_matrix)

    return adjacency_matrix


def remove_non_connected_nodes(adjacency_matrix: pd.DataFrame) -> pd.DataFrame:
    """Remove nodes that dont have incoming edges"""
    # axus=1 for outgoing edges
    # axis=0 for incoming edges
    conected_nodes = adjacency_matrix.loc[(adjacency_matrix != 0).any(axis=0)].index
    adjacency_matrix = adjacency_matrix.loc[conected_nodes, conected_nodes]

    return adjacency_matrix


In [5]:
n_games = 10000
openings = load_opening_data()
adjacency_matrix = pd.DataFrame(
    0,
    index=openings.name.drop_duplicates(),
    columns=openings.name.drop_duplicates(),
    dtype=int32,
)
games = load_games(n_games=n_games)
positions = get_positions(games)
adjacency_matrix = fill_adjacency_matrix(positions, openings, adjacency_matrix)


Loading games: 100%|██████████| 10000/10000 [00:27<00:00, 362.82it/s]
Extracting positions: 100%|██████████| 10000/10000 [00:59<00:00, 168.78it/s]
Analyzing games: 100%|██████████| 10000/10000 [00:09<00:00, 1009.26it/s]


In [6]:
# adjacency_matrix.to_csv("files/adjacency_matrix_100000.csv")
# occurences = adjacency_matrix.sum(axis=1)
# occurences.to_csv("files/occurences_100000.csv", index_label="Id")
