In [21]:
import io
import sqlite3

import chess
import chess.engine
import chess.pgn
import pandas as pd
from tqdm import tqdm


In [22]:
def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict:
    col_names = [col[0] for col in cursor.description]
    return dict(zip(col_names, row))


In [23]:
def create_dataframe(n_games: int, file_path: str) -> pd.DataFrame:
    """Extract the following features from each game in the pgn file:
        URL of the game,
        PGN of the game,
        rhe result

    Return a dataframe.
    """

    urls = []
    pgns = []
    results = []
    with open(file_path) as pgn_file:
        for i in range(n_games):
            exporter = chess.pgn.StringExporter(headers=False)
            game = chess.pgn.read_game(pgn_file)
            if game is not None:
                url = game.headers["LichessURL"]
                pgn = game.accept(exporter)
                termination = game.headers["Termination"]
                if game.headers["Result"] == "1-0":
                    result = 1
                elif game.headers["Result"] == "0-1":
                    result = -1
                else:
                    result = 0
            else:
                break

            if termination == "Normal":
                urls.append(url)
                pgns.append(pgn)
                results.append(result)

    games_extracted = pd.DataFrame(
        [urls, pgns, results], index=["url", "pgn", "result"]
    ).T.set_index("url")

    return games_extracted


In [24]:
def write_games(games: pd.DataFrame) -> None:
    """Create a SQL Database with one table games, which has columns
    URL and PGN"""

    with sqlite3.connect("games2") as con:
        con.execute(
            """
            CREATE TABLE IF NOT EXISTS 
                games(
                    url TEXT ,
                    pgn TEXT,
                    result INTEGER,
                    UNIQUE (url)
                )
            """
        )

        con.executemany(
            """
            INSERT OR IGNORE INTO 
                games(url, pgn, result) 
            VALUES 
                (:url, :pgn, :result)
            """,
            games.itertuples(),
        )
    con.close()


In [25]:
def extract_positions(pgn: str) -> list[tuple[int, str]]:
    game = chess.pgn.read_game(io.StringIO(pgn))
    positions = []
    if game is not None:
        main_line = list(game.mainline())
        for move in main_line:
            board = move.board()
            positions.append((board.ply(), board.fen()))

        return positions
    else:
        raise RuntimeError


In [26]:
def write_positions() -> None:
    """Read the pgn from the games table and write the fen and ply of each move in a new table positions"""

    with sqlite3.connect("games2") as con:
        cur = con.cursor()
        con.execute(
            """
            CREATE TABLE IF NOT EXISTS 
                positions(
                    game_id INTEGER,
                    ply  INTEGER NOT NULL,
                    fen TEXT,
                    eval REAL,
                    UNIQUE (game_id, ply)
                )"""
        )

        cur.execute(
            """
            SELECT rowid as game_id, pgn 
            FROM games
            WHERE game_id NOT IN (
                SELECT game_id
                FROM positions
            )
            """
        )
        for game_id, pgn in cur.fetchall():
            positions = extract_positions(pgn)
            positions = [(game_id, fen, ply) for ply, fen in positions]
            con.executemany(
                """
                INSERT OR IGNORE INTO 
                    positions(game_id, fen, ply) 
                VALUES 
                    (:game_id, :fen, :ply)
                """,
                (positions),
            )

    cur.close()
    con.close()


In [27]:
def annotate_positions() -> None:
    """Read all positions and run them through stockfish. Write the results back to the db."""

    with sqlite3.connect("games2") as con:
        con.row_factory = dict_factory
        res = con.execute(
            """
            SELECT 
                url, ply, fen 
            FROM positions 
            WHERE eval IS NULL
            """
        )
        res = res.fetchall()

        with chess.engine.SimpleEngine.popen_uci(
            "F:\Downloads\stockfish_15_win_x64_avx2\stockfish_15_x64_avx2.exe"
        ) as engine:
            engine.configure({"Threads": 6, "Use NNUE": True, "Hash": 3000})
            for i, row in enumerate(tqdm(res, unit="positions")):
                board = chess.Board(row["fen"])
                eval = (
                    engine.analyse(
                        board, chess.engine.Limit(depth=10, time=0.5), game=row["url"]
                    )["score"]
                    .white()
                    .score(mate_score=10_000)
                )
                con.execute(
                    """
                    UPDATE positions 
                    SET eval = :eval 
                    WHERE 
                        url = :url 
                        AND ply = :ply
                    """,
                    {"eval": eval, "url": row["url"], "ply": row["ply"]},
                )
                # Reduce overhead for commits
                if i % 100 == 0:
                    con.commit()
    con.close()


In [28]:
def get_material(fen: str):
    """Return the material for white and black
    Source: https://github.com/niklasf/python-chess/discussions/864?sort=top"""
    board = chess.Board(fen)
    white = board.occupied_co[chess.WHITE]
    black = board.occupied_co[chess.BLACK]
    white_points = (
        chess.popcount(white & board.pawns)
        + 3 * chess.popcount(white & board.knights)
        + 3 * chess.popcount(white & board.bishops)
        + 5 * chess.popcount(white & board.rooks)
        + 9 * chess.popcount(white & board.queens)
    )

    black_points = (
        chess.popcount(black & board.pawns)
        + 3 * chess.popcount(black & board.knights)
        + 3 * chess.popcount(black & board.bishops)
        + 5 * chess.popcount(black & board.rooks)
        + 9 * chess.popcount(black & board.queens)
    )

    return white_points, black_points


In [29]:
def write_material() -> None:
    """Write the material value of white and black to the db"""
    with sqlite3.connect("games2") as con:
        con.row_factory = dict_factory
        res = con.execute(
            """
            PRAGMA table_info(positions)
            """
        ).fetchall()
        # Kind of hack
        if "material_black" not in str(res):
            con.execute(
                """
                ALTER TABLE positions
                ADD material_white INTEGER
                """
            )
            con.execute(
                """
                ALTER TABLE positions
                ADD material_black INTEGER
                """
            )

        res = con.execute(
            """
            SELECT 
                url, ply, fen
            FROM positions
            -- WHERE material_black IS NULL
            """
        )
        for i, row in enumerate(res):
            material_white, material_black = get_material(row["fen"])
            con.execute(
                """
                UPDATE positions
                SET 
                    material_white = :white,
                    material_black = :black
                WHERE
                    url = :url
                    AND ply = :ply
                """,
                {
                    "white": material_white,
                    "black": material_black,
                    "url": row["url"],
                    "ply": row["ply"],
                },
            )
            if i % 1000 == 0:
                con.commit()
    con.close()


In [30]:
file_path = "F:\\Dokumente\\git\\schach\\lichess_elite_2022-04.pgn"
n_games = 3
df = create_dataframe(
    n_games=n_games,
    file_path=file_path,
)
write_games(df)
write_positions()
annotate_positions()
write_material()


OperationalError: no such column: url