# Get Data from file

In [49]:
class MoveDictionary:
    def __init__(self):
        all_moves = self.generate_all_moves()
        self.move_index_dict = {move: index for index, move in enumerate(all_moves)}
        self.index_move_dict = {index: move for index, move in enumerate(all_moves)}
        #return move_index_dict


    def get_all_legal_moves(self, fen):
        board = chess.Board(fen)
        legal_moves = list(board.legal_moves)  # Get an iterator of legal moves and convert to a list
        moves = [move.uci() for move in legal_moves]
        return [self.move_index_dict[move] for move in moves]

    def generate_all_squares(self):
        files = 'abcdefgh'
        ranks = '12345678'
        return [f + r for f in files for r in ranks]

    def is_within_board(self, file, rank):
        return 'a' <= file <= 'h' and '1' <= rank <= '8'

    def move_in_direction(self, start_square, file_step, rank_step, steps=8):
        moves = []
        start_file, start_rank = start_square[0], start_square[1]
        for step in range(1, steps + 1):
            new_file = chr(ord(start_file) + file_step * step)
            new_rank = chr(ord(start_rank) + rank_step * step)
            if self.is_within_board(new_file, new_rank):
                moves.append(new_file + new_rank)
            else:
                break
        return moves

    def generate_fairy_moves(self, start_square):
        directions = [
            (1, 0), (-1, 0), (0, 1), (0, -1),  # Rook-like moves
            (1, 1), (1, -1), (-1, 1), (-1, -1),  # Bishop-like moves
            (2, 1), (2, -1), (-2, 1), (-2, -1),  # Knight-like moves
            (1, 2), (1, -2), (-1, 2), (-1, -2)
        ]
        moves = []
        for file_step, rank_step in directions:
            if abs(file_step) == 2 or abs(rank_step) == 2:  # Knight-like moves
                moves.extend(self.move_in_direction(start_square, file_step, rank_step, steps=1))
            else:
                moves.extend(self.move_in_direction(start_square, file_step, rank_step))
        return moves

    def generate_promotion_moves(self, start_square, end_square):
        promotion_pieces = ['b', 'n', 'r', 'q']
        return [start_square + end_square + piece for piece in promotion_pieces]

    def generate_all_moves(self):
        all_squares = self.generate_all_squares()
        all_moves = []

        for start_square in all_squares:
            fairy_moves = self.generate_fairy_moves(start_square)
            for end_square in fairy_moves:
                all_moves.append(start_square + end_square)
                # Add promotion moves for pawns
                if start_square[1] == '7' and end_square[1] == '8' and abs(int(ord(start_square[0]))-int(ord(end_square[0]))) <= 1:  # White pawn promotion
                    all_moves.extend(self.generate_promotion_moves(start_square, end_square))
                if start_square[1] == '2' and end_square[1] == '1' and abs(int(ord(start_square[0]))-int(ord(end_square[0]))) <= 1:  # Black pawn promotion
                    all_moves.extend(self.generate_promotion_moves(start_square, end_square))
        return all_moves

In [51]:
def fen_to_vector(fen):
    fen_parts = fen.split(" ")

    # If black to move, flip the board
    if fen_parts[1] == "b":
        fen_parts[0] = fen_parts[0][::-1].swapcase()  # Reverse and swapcase for black perspective
        fen_parts[2] = fen_parts[2].swapcase()  # Swap castling rights

    # Your corrected piece_dict
    piece_dict = {
        " ": "1,", "p": "2,", "n": "3,", "b": "4,", "r": "5,", "q": "6,", "k": "7,", 
        "P": "8,", "N": "9,", "B": "10,", "R": "11,", "Q": "12,", "K": "13,"
    }

    position = ["0,"]  # Special token to start the vector

    # Build the board position vector
    for row in fen_parts[0].split("/"):
        for square in row:
            if square.isdigit():
                # Add the appropriate number of "1,"s for empty squares
                position.extend(["1,"] * int(square))
            else:
                position.append(piece_dict[square])  # Get the piece value from the dictionary

    # Castling rights
    castling_rights = fen_parts[2]
    position.append("1," if "K" in castling_rights else "0,")
    position.append("1," if "Q" in castling_rights else "0,")
    position.append("1," if "k" in castling_rights else "0,")
    position.append("1," if "q" in castling_rights else "0,")

    # En passant square
    en_passant = fen_parts[3]
    if en_passant == "-":
        position.extend(["0,"] * 9)  # No en passant
    else:
        file_index = ord(en_passant[0]) - 97
        position.append("1,")  # En passant available
        position.extend(["0,"] * file_index)  # Empty squares before en passant file
        position.append("1,")  # En passant file
        position.extend(["0,"] * (7 - file_index))  # Empty squares after en passant file

    # Join the list into a single string and remove the trailing comma
    return "".join(position).rstrip(",")

In [54]:
import chess.pgn
import time
import pandas as pd
import re
import io

def get_game_result(game):
    """
    Extracts the game result from the game headers.
    Returns:
    - 1 if White wins
    - -1 if Black wins
    - 0 if Draw
    - None if unknown or other result
    """
    result = game.headers["Result"]
    if result == "1-0":
        return 1  # White wins
    elif result == "0-1":
        return -1  # Black wins
    elif result == "1/2-1/2":
        return 0  # Draw
    return None

def game_matches_criteria(game_pgn, expected_termination, min_avg_elo):
    """
    Check if the game matches the termination and Elo criteria.
    """
    termination = re.search(r'\[Termination "([^"]+)"\]', game_pgn)
    if termination and termination.group(1).lower() != expected_termination.lower():
        return False

    white_elo = re.search(r'\[WhiteElo "(\d+)"\]', game_pgn)
    black_elo = re.search(r'\[BlackElo "(\d+)"\]', game_pgn)

    if white_elo and black_elo:
        average_elo = (int(white_elo.group(1)) + int(black_elo.group(1))) / 2
        return average_elo >= min_avg_elo

    return False

In [55]:
import time
import chess.pgn
import pandas as pd
import io
from concurrent.futures import ThreadPoolExecutor, as_completed

def process_single_game(game, game_count,move_to_index):
    """
    Processes a single game, extracting move-by-move data, including FEN, 
    next move, game result from the player's perspective, termination status, and average Elo.
    
    Parameters:
    - game: chess.pgn.Game object representing a single chess game.
    - game_count: integer, the count of the game being processed.
    - move_dict: an instance of MoveDictionary to convert moves to indices.
    
    Returns:
    - List of dictionaries containing move-by-move data for the game.
    """
    # Use get_game_result function to determine the result of the game
    game_result = get_game_result(game)
    if game_result is None:
        game_result = 0  # Default to a draw if the result is unknown

    # Extract Elo ratings and compute the average Elo
    white_elo = int(game.headers["WhiteElo"])
    black_elo = int(game.headers["BlackElo"])
    average_elo = (white_elo + black_elo) / 2

    board = game.board()
    move_lst = list(game.mainline_moves())
    total_moves = len(move_lst)
    turn_multiplier = 1  # White starts
    game_data = []

    # Determine which player's data we need (or both for a draw)
    if game_result != 0:
        winning_player = 1 if game_result == 1 else -1
    else:
        winning_player = 0  # Draw, we use data from both players

    move_number = 1
    for move in move_lst:
        current_fen = board.fen()
        board.push(move)
        next_move = move.uci()
        moves_left = total_moves - move_number

        # Store data only for the winning player or both if it's a draw
        if winning_player == 0 or (winning_player == turn_multiplier):
            game_data.append({
                'game_number': game_count,
                'current_fen': current_fen,
                'board_state': fen_to_vector(current_fen),
                'next_move': move_to_index[next_move],
                'moves_left': moves_left,
                'result_from_perspective': 1 if game_result != 0 else 0,
                'average_elo': average_elo
            })

        move_number += 1
        turn_multiplier *= -1  # Alternate between white and black

    return game_data

In [57]:
import time
import chess.pgn
import pandas as pd
import io
import re
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_game_data(file_path, expected_termination="normal", max_games=500_000, min_avg_elo=2000):
    game_data = []
    game_count = 0
    start_time = time.time()
    last_print_time = start_time

    # Initialize the move dictionary
    move_dict_obj = MoveDictionary()
    move_to_index = move_dict_obj.move_index_dict

    # Initialize cumulative timers
    cumulative_read_time = 0
    cumulative_process_time = 0

    with open(file_path) as pgn_file:
        game_string = []
        while True:
            line = pgn_file.readline()
            if not line:
                break

            if line.startswith('[Event '):
                if game_string:
                    game_pgn = "".join(game_string)
                    if game_matches_criteria(game_pgn, expected_termination, min_avg_elo):
                        try:
                            # Time tracking for reading game
                            read_start_time = time.time()
                            game = chess.pgn.read_game(io.StringIO(game_pgn))
                            read_end_time = time.time()
                            cumulative_read_time += (read_end_time - read_start_time)

                            if game is None:
                                continue

                            # Time tracking for processing game
                            process_start_time = time.time()
                            processed_game_data = process_single_game(game, game_count, move_to_index)
                            process_end_time = time.time()
                            cumulative_process_time += (process_end_time - process_start_time)

                            # Append processed game data
                            game_data.extend(processed_game_data)

                            game_count += 1
                            if game_count >= max_games:
                                break

                        except Exception as e:
                            print(f"Error processing game: {e}")

                game_string = []  # Reset for next game

            game_string.append(line)

            # Get the current time
            current_time = time.time()

            # Check if 10 seconds have passed since the last print
            if current_time - last_print_time >= 10:
                elapsed_time = current_time - start_time
                print(f"Processed {game_count} games. Time elapsed: {elapsed_time:.2f} seconds.")
                print(f"Cumulative read time for 'read_game': {cumulative_read_time:.2f} seconds.")
                print(f"Cumulative process time for 'process_single_game': {cumulative_process_time:.2f} seconds.")
                print()
                last_print_time = current_time  # Reset the last print time

    end_time = time.time()
    time_taken = end_time - start_time

    # Final cumulative time printout
    print(f"Total cumulative time for reading games: {cumulative_read_time:.2f} seconds.")
    print(f"Total cumulative time for processing games: {cumulative_process_time:.2f} seconds.")
    print(f"Total time taken: {time_taken:.2f} seconds.")

    df = pd.DataFrame(game_data)
    return df, time_taken

In [61]:
# Example usage
pgn_file = r"C:\Users\bluni\Chess-Transformer\large_datasets\lichess_db_standard_rated_2024-08.pgn"
game_df, time_taken = extract_game_data(pgn_file, expected_termination="normal", max_games=5_000, min_avg_elo=2000)
print(f"Processed in {time_taken:.2f} seconds.")
game_df

Processed 2669 games. Time elapsed: 10.00 seconds.
Cumulative read time for 'read_game': 3.18 seconds.
Cumulative process time for 'process_single_game': 6.59 seconds.

Total cumulative time for reading games: 6.02 seconds.
Total cumulative time for processing games: 12.28 seconds.
Total time taken: 18.72 seconds.
Processed in 18.72 seconds.


Unnamed: 0,game_number,current_fen,board_state,next_move,moves_left,result_from_perspective,average_elo
0,0,rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR ...,"0,5,3,4,7,6,4,3,5,2,2,2,1,2,2,2,2,1,1,1,1,1,1,...",663,46,1,2507.5
1,0,rnbqkbnr/pp1ppppp/8/2p5/4P3/5N2/PPPP1PPP/RNBQK...,"0,5,1,4,7,6,4,3,5,2,2,2,1,2,2,2,2,1,1,3,1,1,1,...",1198,44,1,2507.5
2,0,rnbqkbnr/pp1p1ppp/4p3/2p5/4P3/2P2N2/PP1P1PPP/R...,"0,5,1,4,7,6,4,3,5,2,2,2,1,2,1,2,2,1,1,3,1,1,2,...",931,42,1,2507.5
3,0,rnbqkbnr/pp3ppp/4p3/2pp4/3PP3/2P2N2/PP3PPP/RNB...,"0,5,1,4,7,6,4,3,5,2,2,2,1,1,1,2,2,1,1,3,1,1,2,...",867,40,1,2507.5
4,0,rnbqkbnr/pp3ppp/4p3/2p1N3/3Pp3/2P5/PP3PPP/RNBQ...,"0,5,1,4,7,6,4,3,5,2,2,2,1,1,1,2,2,1,1,1,1,1,2,...",601,38,1,2507.5
...,...,...,...,...,...,...,...
189519,4999,rn1qkbnr/ppp2ppp/4p3/3pPb2/3P4/8/PPP2PPP/RNBQK...,"0,5,3,1,6,7,4,3,5,2,2,2,1,1,2,2,2,1,1,1,1,2,1,...",485,8,1,2043.5
189520,4999,rn1qkbnr/pp3ppp/4p3/2ppPb2/2PP4/8/PP3PPP/RNBQK...,"0,5,3,1,6,7,4,3,5,2,2,1,1,1,2,2,2,1,1,1,1,2,1,...",1539,6,1,2043.5
189521,4999,rn1qkbnr/pp3ppp/4p3/2ppP3/2PP2b1/5N2/PP3PPP/RN...,"0,5,3,1,6,7,4,3,5,2,2,1,1,1,2,2,2,1,1,1,1,2,1,...",565,4,1,2043.5
189522,4999,rn2kbnr/pp3ppp/4p3/2pqP3/3P2b1/5N2/PP3PPP/RNBQ...,"0,5,3,1,1,7,4,3,5,2,2,1,1,1,2,2,2,1,1,1,1,2,1,...",232,2,1,2043.5


# Save Data

In [62]:
import sqlite3

def create_database(db_path):
    # Connect to SQLite database at db_path, will create if it doesn't exist
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        
        # SQL statement to create a new table with columns matching your dataframe
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS chess_analysis (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                game_number INTEGER,
                current_fen TEXT,
                board_state TEXT,
                next_move INTEGER,
                moves_left INTEGER,
                result_from_perspective REAL,
                average_elo REAL
            );
        ''')
        
        conn.commit()
        print("Database created and table initialized for chess analysis.")


In [63]:
def batch_insert_data(db_path, df, batch_size):
    # Open the database connection
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        
        # Prepare the batch of tuples for insertion
        batch = []
        for index, row in df.iterrows():
            # Append each row to the batch
            batch.append((
                row['game_number'],
                row['current_fen'],
                row['board_state'],
                row['next_move'],
                row['moves_left'],
                row['result_from_perspective'],
                row['average_elo']
            ))
            
            # Check if the batch size is reached
            if len(batch) >= batch_size:
                cursor.executemany('''
                    INSERT INTO chess_analysis (game_number, current_fen, board_state, next_move, moves_left, result_from_perspective, average_elo) 
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', batch)
                conn.commit()
                batch = []  # Clear the batch after commit
        
        # Insert any remaining data in the batch
        if batch:
            cursor.executemany('''
                INSERT INTO chess_analysis (game_number, current_fen, board_state, next_move, moves_left, result_from_perspective, average_elo) 
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', batch)
            conn.commit()


In [67]:
def recreate_table(db_path):
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        # Drop the existing table
        cursor.execute('DROP TABLE IF EXISTS chess_analysis')
        conn.commit()
        print("Dropped existing chess_analysis table.")

In [68]:
# Define the database file path
db_path = 'chess_database_lichess_01.db'

# Create the database and table
recreate_table(db_path)
create_database(db_path)

# Assuming you have your DataFrame `game_df`
batch_size = 10000  # Adjust the batch size as needed

# Insert data into the database
batch_insert_data(db_path, game_df, batch_size)
print("Inserted data into database")


Dropped existing chess_analysis table.
Database created and table initialized for chess analysis.
Inserted data into database
