In [1]:
!rm -rf /kaggle/working/*

In [2]:
!pip install chess
import chess.pgn
import csv
import requests
from bs4 import BeautifulSoup
import zstandard as zstd
import os
from pathlib import Path


Collecting chess
  Downloading chess-1.10.0-py3-none-any.whl.metadata (19 kB)
Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess
Successfully installed chess-1.10.0


In [3]:

# Define URL to fetch PGN files from
BASE_URL = "https://database.lichess.org/"

# Define number of files to download and process
NUM_FILES_TO_PROCESS = 1

# Define the headers for the CSV file
csv_headers = [
    "Game_ID", "Move_Number", "Player", "Move", "FEN_Before", "FEN_After", 
    "Is_Capture", "Is_Check", "Is_Checkmate", "Piece_Moved", "Piece_Captured", 
    "Is_Castling", "Is_EnPassant", "Is_Promotion", "Promotion_Type",
    "Result", "Termination"
]

def fetch_urls():
    """Fetches URLs of PGN files from Lichess database."""
    urls_with_standard = []
    try:
        response = requests.get(BASE_URL)
        response.raise_for_status()  # Check for HTTP errors
        soup = BeautifulSoup(response.text, 'html.parser')

        for a_tag in soup.find_all("a"):
            href = a_tag.get('href')
            if href and "standard/" in href and "pgn.zst" in href and ".torrent" not in href:
                urls_with_standard.append(href.removeprefix("standard/"))

        urls_with_standard.sort()
        print(f"Found {len(urls_with_standard)} URLs")
        return urls_with_standard[:NUM_FILES_TO_PROCESS]  # Limit to desired number
    except requests.RequestException as e:
        print(f"Error fetching URLs: {e}")
        return []

def download_and_decompress(url):
    """Downloads and decompresses a single PGN file."""
    file_name = url
    full_url = requests.compat.urljoin(BASE_URL, "standard/"+file_name)

    try:
        # Download the .pgn.zst file
        response = requests.get(full_url)
        response.raise_for_status()  # Check for HTTP errors

        with open(file_name, 'wb') as file:
            file.write(response.content)

        # Decompress the .pgn.zst file
        input_file_path = Path(file_name)
        output_file_path = Path(file_name.replace(".zst", ""))
        with input_file_path.open('rb') as compressed_file, output_file_path.open('wb') as output_file:
            dctx = zstd.ZstdDecompressor()
            dctx.copy_stream(compressed_file, output_file)

        # Delete the .pgn.zst file
        os.remove(input_file_path)
        print(f"Downloaded and decompressed {file_name}")
        return output_file_path  # Return path of the decompressed file
    except requests.RequestException as e:
        print(f"Error downloading {file_name}: {e}")
        return None
    except OSError as e:
        print(f"File operation error for {file_name}: {e}")
        return None

if __name__ == "__main__":
    urls = fetch_urls()
    for url in urls:
        pgn_file_path = download_and_decompress(url)

Found 139 URLs
Downloaded and decompressed lichess_db_standard_rated_2013-01.pgn.zst


In [4]:
import chess
import csv
from pathlib import Path

csv_headers = [
    "Game Number", "Move Number", "Player", "Move (SAN)", "FEN Before", "FEN After",
    "Is Capture", "Is Check", "Is Checkmate", "Piece Moved", "Piece Captured",
    "Is Castling", "Is En Passant", "Is Promotion", "Promotion Type",
    "Result", "Termination","Event"
]

def extract_move_data(pgn_file_path, csv_file_path):
    """Extracts move-specific data from a PGN file and writes to CSV."""
    game_count = 0

    with open(csv_file_path, 'w', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(csv_headers)

        with open(pgn_file_path, 'r') as pgn_file:
            while True:
                game = chess.pgn.read_game(pgn_file)
                if game is None:
                    break
                game_count += 1

                result = game.headers.get("Result", "Unknown")
                gametype = game.headers.get("Event", "Unknown")
                termination = game.headers.get("Termination", "Unknown")

                # Traverse the game move by move
                board = game.board()
                move_number = 0
                for move in game.mainline_moves():
                    move_number += 1
                    fen_before = board.fen()
                    player = "White" if board.turn else "Black"

                    move_san = board.san(move)
                    is_capture = board.is_capture(move)
                    piece_moved = board.piece_at(move.from_square).symbol().upper() if board.piece_at(move.from_square) else None
                    is_castling = board.is_castling(move)
                    is_en_passant = board.is_en_passant(move)
                    board.push(move)
                    is_check = board.is_check()
                    is_checkmate = board.is_checkmate()
                    is_promotion = move.promotion is not None
                    promotion_type = chess.PIECE_SYMBOLS[move.promotion].upper() if is_promotion else None
                    fen_after = board.fen()
                    board.pop()


                    if is_en_passant:
                        # Determine the square behind the pawn that moved two steps
                        ep_square = chess.square(chess.square_file(move.to_square), chess.square_rank(move.from_square))
                        piece_captured = board.piece_at(ep_square).symbol().upper()
                    else:
                        piece_captured = board.piece_at(move.to_square).symbol().upper() if is_capture else None
                    board.push(move)
                    csv_writer.writerow([
                        game_count, move_number, player, move_san, fen_before, fen_after,
                        is_capture, is_check, is_checkmate, piece_moved, piece_captured,
                        is_castling, is_en_passant, is_promotion, promotion_type,
                        result, termination,gametype
                    ])
                if game_count % 1000 == 0:
                    print(f"Processed {game_count} games...")

        print(f"Total number of games in the PGN file: {game_count}")
        print(f"Move-specific data saved to {csv_file_path}")

# Example usage
csv_file_path = pgn_file_path.with_suffix('.csv')
extract_move_data(pgn_file_path, csv_file_path)

Processed 1000 games...
Processed 2000 games...
Processed 3000 games...
Processed 4000 games...
Processed 5000 games...
Processed 6000 games...
Processed 7000 games...
Processed 8000 games...
Processed 9000 games...
Processed 10000 games...
Processed 11000 games...
Processed 12000 games...
Processed 13000 games...
Processed 14000 games...
Processed 15000 games...
Processed 16000 games...
Processed 17000 games...
Processed 18000 games...
Processed 19000 games...
Processed 20000 games...
Processed 21000 games...
Processed 22000 games...
Processed 23000 games...
Processed 24000 games...
Processed 25000 games...
Processed 26000 games...
Processed 27000 games...
Processed 28000 games...
Processed 29000 games...
Processed 30000 games...
Processed 31000 games...
Processed 32000 games...
Processed 33000 games...
Processed 34000 games...
Processed 35000 games...
Processed 36000 games...
Processed 37000 games...
Processed 38000 games...
Processed 39000 games...
Processed 40000 games...
Processed