In [1]:
import chess.pgn
import pandas as pd

In [2]:
ds = pd.read_csv("data/chess_game_0001.csv", index_col=0)
ds

Unnamed: 0,Moves,Termination,Result
0,['d2d4' 'f7f5' 'g2g3' 'g7g6' 'f1g2' 'f8g7' 'g1...,FIVEFOLD_REPETITION,1/2-1/2
1,['e2e4' 'e7e6' 'd2d4' 'd7d5' 'b1c3' 'f8b4' 'e4...,CHECKMATE,1-0
2,['d2d4' 'g8f6' 'c2c4' 'e7e5' 'd4e5' 'f6g4' 'c1...,INSUFFICIENT_MATERIAL,1/2-1/2
3,['c2c4' 'g8f6' 'b1c3' 'e7e5' 'g2g3' 'g7g6' 'f1...,CHECKMATE,1-0
4,['d2d4' 'g8f6' 'c2c4' 'e7e6' 'b1c3' 'f8b4' 'd1...,CHECKMATE,1-0
...,...,...,...
995,['e2e4' 'c7c6' 'd2d4' 'd7d5' 'b1c3' 'd5e4' 'c3...,FIVEFOLD_REPETITION,1/2-1/2
996,['d2d4' 'g8f6' 'c2c4' 'g7g6' 'b1c3' 'f8g7' 'e2...,CHECKMATE,1-0
997,['d2d4' 'd7d5' 'c2c4' 'e7e6' 'g1f3' 'g8f6' 'b1...,INSUFFICIENT_MATERIAL,1/2-1/2
998,['e2e4' 'e7e5' 'g1f3' 'b8c6' 'f1b5' 'a7a6' 'b5...,FIVEFOLD_REPETITION,1/2-1/2


In [3]:
from typing import List
import chess
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def get_uci_game(game: str) -> List[str]:
    game = game.strip("[]")
    game = game.replace("\n", "")
    game = game.replace("'", "")
    game = game.split(" ")
    return game


def get_board_states(uci_game: List[str]) -> List[str]:
    board = chess.Board()
    board.reset()
    board_states = [board.fen()]
    for move in uci_game:
        move = chess.Move.from_uci(move)
        board.push(move)
        board_states.append(board.fen())
    return board_states

In [9]:
import sqlite3
from typing import Tuple


def connect_chess_db(db_name: str) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
    conn = sqlite3.connect(db_name, check_same_thread=False)
    cursor = conn.cursor()
    return conn, cursor


def create_tables(cursor: sqlite3.Cursor):
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS games (
        id INTEGER PRIMARY KEY,
        result TEXT,
        termination TEXT
    )
    """)    

    cursor.execute("""
    CREATE TABLE IF NOT EXISTS moves (
        id INTEGER PRIMARY KEY,
        game_id INTEGER,
        move_number INTEGER,
        move_id INTEGER,
        board_fen_id INTEGER,
        FOREIGN KEY(game_id) REFERENCES games(id),
        FOREIGN KEY(move_id) REFERENCES move_collection(id),
        FOREIGN KEY(game_id) REFERENCES games(id)
    )
    """)
    
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS board_states (
        id INTEGER PRIMARY KEY,
        board_fen TEXT,
        UNIQUE(board_fen)
    )
    """)
    
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS move_collection (
        id INTEGER PRIMARY KEY,
        move TEXT,
        UNIQUE(move)
    )
    """)


def insert_if_not_exists(cursor, table, column, value):
    cursor.execute(f"INSERT OR IGNORE INTO {table} ({column}) VALUES (?)", (value,))
    cursor.execute(f"SELECT id FROM {table} WHERE {column} = ?", (value,))
    return cursor.fetchone()[0]

def insert_game(cursor: sqlite3.Cursor, move_sequence: List[str], board_fens: List[str], result: str, termination: str):
    """
    Inserts a game and its moves into the database.
    
    - move_sequence: list of SAN move strings
    - board_fens: list of FEN strings *after each move*
    - result: string (e.g. '1-0')
    - termination: string (e.g. 'checkmate')
    """

    # 1. Insert into games
    cursor.execute("""
        INSERT INTO games (result, termination) VALUES (?, ?)
    """, (result, termination))
    game_id = cursor.lastrowid

    # 2. Insert moves and board states
    for move_number, (move_str, fen_str) in enumerate(zip(move_sequence, board_fens), start=1):
        # Insert (or get) move id
        move_id = insert_if_not_exists(cursor, "move_collection", "move", move_str)

        # Insert (or get) board_fen_id
        board_fen_id = insert_if_not_exists(cursor, "board_states", "board_fen", fen_str)

        # Insert into moves
        cursor.execute("""
            INSERT INTO moves (game_id, move_number, move_id, board_fen_id)
            VALUES (?, ?, ?, ?)
        """, (game_id, move_number, move_id, board_fen_id))

    return game_id



#def insert_into_db(cursor: sqlite3.Cursor, moves, termination, result, board_fen):
#    cursor.execute(
#        "INSERT INTO games (result, termination) VALUES (?, ?)", (result, termination)
#    )
#    game_id = cursor.lastrowid
#
#    for i, move in enumerate(moves):
#
#        cursor.execute(
#            "INSERT INTO moves (game_id, move_number, move_id, board_fen_id) VALUES (?, ?, ?, ?)",
#            (game_id, i + 1, move, board_fen[i + 1]),
#        )


conn, cursor = connect_chess_db("data/chess_games_4.db")
create_tables(cursor)

for game_idx in tqdm(ds.index):
    moves, termination, result = ds.iloc[game_idx]
    moves = get_uci_game(moves)
    board_fen = get_board_states(moves)
    insert_game(cursor, moves, board_fen, termination, result, )
conn.commit()
conn.close()

  0%|          | 0/1000 [00:00<?, ?it/s]

100%|██████████| 1000/1000 [00:11<00:00, 90.22it/s]


In [None]:
import sqlite3
import pandas as pd

def fetch_games_with_moves(db_path="chess.db", filters=None):
    conn = sqlite3.connect(db_path)

    # Optional filters (e.g. result='1-0') as WHERE conditions
    where_clause = ""
    params = []
    if filters:
        conditions = []
        for key, value in filters.items():
            conditions.append(f"g.{key} = ?")
            params.append(value)
        where_clause = "WHERE " + " AND ".join(conditions)

    # SQL query with JOINs to get readable move and board info
    query = f"""
    SELECT
        g.id AS game_id,
        g.result,
        g.termination,
        m.move_number,
        mc.move,
        bs.board_fen
    FROM games g
    JOIN moves m ON g.id = m.game_id
    JOIN move_collection mc ON m.move_id = mc.id
    JOIN board_states bs ON m.board_fen_id = bs.id
    {where_clause}
    ORDER BY g.id, m.move_number
    """

    df = pd.read_sql_query(query, conn, params=params)
    conn.close()
    return df



def fetch_games(db_path="chess.db", filters=None):
    conn = sqlite3.connect(db_path)

    where_clause = ""
    params = []

    if filters:
        conditions = []
        for column, operator, value in filters:
            qualified_column = f"g.{column}"
            conditions.append(f"{qualified_column} {operator} ?")
            params.append(value)
        where_clause = "WHERE " + " AND ".join(conditions)

    query = f"""
    SELECT
        g.id AS game_id,
        g.result,
        g.termination
    FROM games g
    {where_clause}
    ORDER BY g.id
    """

    df = pd.read_sql_query(query, conn, params=params)
    conn.close()
    return df


def fetch_moves(db_path="chess.db", filters=None):
    import sqlite3
    import pandas as pd

    conn = sqlite3.connect(db_path)

    where_clause = ""
    params = []

    if filters:
        conditions = []
        for column, operator, value in filters:
            # Fully qualify known fields
            qualified_column = {
                'move': 'mc.move',
                'board_fen': 'bs.board_fen',
                'move_number': 'm.move_number',
                'game_id': 'm.game_id'
            }.get(column, f"m.{column}")
            conditions.append(f"{qualified_column} {operator} ?")
            params.append(value)
        where_clause = "WHERE " + " AND ".join(conditions)

    query = f"""
    SELECT
        m.id AS move_id,
        m.game_id,
        m.move_number,
        mc.move,
        bs.board_fen
    FROM moves m
    JOIN move_collection mc ON m.move_id = mc.id
    JOIN board_states bs ON m.board_fen_id = bs.id
    {where_clause}
    ORDER BY m.game_id, m.move_number
    """

    df = pd.read_sql_query(query, conn, params=params)
    conn.close()
    return df

filters = [
    ("move_number", "<=", 100),
    # ("move", "=", "e4"),
    # ("game_id", "=", 1)
]
fetch_moves("data/chess_games_4.db", filters=filters)

Unnamed: 0,move_id,game_id,move_number,move,board_fen
0,1,1,1,d2d4,rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w ...
1,2,1,2,f7f5,rnbqkbnr/pppppppp/8/8/3P4/8/PPP1PPPP/RNBQKBNR ...
2,3,1,3,g2g3,rnbqkbnr/ppppp1pp/8/5p2/3P4/8/PPP1PPPP/RNBQKBN...
3,4,1,4,g7g6,rnbqkbnr/ppppp1pp/8/5p2/3P4/6P1/PPP1PP1P/RNBQK...
4,5,1,5,f1g2,rnbqkbnr/ppppp2p/6p1/5p2/3P4/6P1/PPP1PP1P/RNBQ...
...,...,...,...,...,...
98865,189822,1000,96,g8f6,6n1/5pk1/p3r1p1/P6p/8/2Q4P/6P1/6K1 b - - 10 48
98866,189823,1000,97,c3b2,8/5pk1/p3rnp1/P6p/8/2Q4P/6P1/6K1 w - - 11 49
98867,189824,1000,98,e6e1,8/5pk1/p3rnp1/P6p/8/7P/1Q4P1/6K1 b - - 12 49
98868,189825,1000,99,g1h2,8/5pk1/p4np1/P6p/8/7P/1Q4P1/4r1K1 w - - 13 50


In [5]:
ds = pd.read_parquet("data/chess_game_0001.parquet")
ds = ds.iloc[:100_000]

In [None]:
conn, cursor = connect_chess_db("data/chess_games.db")
create_tables(cursor)


for game_idx in tqdm(ds.index):
    
conn.commit()
conn.close()

100%|██████████| 100000/100000 [08:34<00:00, 194.25it/s]


In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np

def insert_into_db(cursor: sqlite3.Cursor, game_idx, moves, termination, result, board_fen):
    cursor.execute(
        "INSERT INTO games (id, result, termination) VALUES (?, ?, ?)", (game_idx, result, termination)
    )
    for i, move in enumerate(moves):
        cursor.execute(
            "INSERT INTO moves (game_id, move_number, move, board_fen) VALUES (?, ?, ?, ?)",
            (game_idx, i + 1, move, board_fen[i + 1]),
        )
def dummy_func(cursor: sqlite3.Cursor, game_indices: np.ndarray, df_data: pd.DataFrame):
    if len(game_indices) == 0:
        return 
    for game_idx in game_indices:
        moves, termination, result = df_data.iloc[int(game_idx)]
        board_fen = get_board_states(moves)
        insert_into_db(cursor, game_idx, moves, termination, result, board_fen)
        
conn, cursor = connect_chess_db("data/chess_games_2.db")
create_tables(cursor)

chunk_size = 100
n_chunks = int(np.ceil(len(ds) / chunk_size))
prog_bar = tqdm(len(ds))
with ThreadPoolExecutor(4) as executor:
    futures = []
    chunks = np.split(np.arange(len(ds)), np.cumsum((np.ones(n_chunks, dtype=int) * chunk_size)))
    for indices in chunks:
        futures.append(executor.submit(dummy_func, cursor=cursor, game_indices=indices, df_data=ds.iloc[indices]))
        
    for _ in as_completed(futures):
        prog_bar.n +=1
        prog_bar.refresh()
conn.commit()
conn.close()

1001it [00:06, 158.89it/s]
89it [00:00, 407.98it/s]

done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done


1001it [00:05, 199.13it/s] 

In [None]:
def dummy_func(cursor: sqlite3.Cursor, game_indices: np.ndarray, df_data: pd.DataFrame):
    if len(game_indices) == 0:
        return 
    for game_idx in game_indices:
        moves, termination, result = df_data.iloc[int(game_idx)]
        board_fen = get_board_states(moves)
        insert_into_db(cursor, game_idx, moves, termination, result, board_fen)

In [None]:
import chess.pgn
import sqlite3
import os

# Initialize SQLite database

# Create tables

# Function to process PGN file
def process_pgn_file(pgn_path):
    with open(pgn_path) as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break

            result = game.headers.get("Result", "")
            termination = game.headers.get("Termination", "")

            # Insert game metadata
            cursor.execute("INSERT INTO games (result, termination) VALUES (?, ?)", (result, termination))
            game_id = cursor.lastrowid

            board = game.board()
            for i, move in enumerate(game.mainline_moves()):
                board.push(move)
                cursor.execute('''
                    INSERT INTO moves (game_id, move_number, move, board_fen)
                    VALUES (?, ?, ?, ?)
                ''', (game_id, i + 1, board.san(move), board.fen()))

    conn.commit()

# Example: process a file called 'games.pgn'
process_pgn_file("games.pgn")

# Close the database
conn.close()
