In [4]:
# %pip install chess

In [1]:
import os
import random
from uuid import uuid4

import chess.engine
import pandas as pd
from tqdm.notebook import tqdm

import chess

STOCKFISH_PATH = "./stockfish/stockfish-ubuntu-x86-64-avx2"

In [2]:
def store_data(games: pd.DataFrame, n_moves, time, depth):
    v = 1
    fname = f"chess-dataset/dataset-v3/chess-ds-v1-{depth}-{time}-{n_moves}-{len(games)}-{v}.parquet.zstd"
    while os.path.exists(fname):
        v += 1
        fname = f"chess-dataset/dataset-v3/chess-ds-v1-{depth}-{time}-{n_moves}-{len(games)}-{v}.parquet.zstd"

    games.to_parquet(fname, index=False, compression="zstd")

In [3]:
def extract_entries(game: pd.DataFrame,max_depth,max_time):
    entries = pd.DataFrame()
    for data in pd.DataFrame(game).apply(lambda x: pd.DataFrame(x["moves"]), axis=1):
        data["game_id"] = uuid4().__str__()
        entries = pd.concat([entries, data])
    entries["depth"] = max_depth
    entries["time"] = max_time
    return entries.reset_index(names=["#_turn"])

In [4]:
import math
def _score(x):
    sign = 1
    if x <= 0:
        sign = -1
    return sign* (10000 - 2000 * math.log(abs(x)+1))

_score(0) # MATE


-10000.0

In [None]:
# Constants
MAX_MOVES = 120  # Maximum moves per game
MAX_GAMES = 200  # Maximum games per dataset
STOCKFISH_DEPTH_LIMIT = 10  # Depth limit for Stockfish calculation
MAX_ANALYSIS_TIME = 100  # Time limit for Stockfish analysis in seconds
N_BEST_MOVES = 2  # Number of top moves to consider

engine = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH)

# Configure Stockfish engine options
def configure_engine(engine):
    engine.protocol.send_line("setoption name Threads value 4")
    engine.protocol.send_line("setoption name Hash value 10000")
    # engine.protocol.send_line("setoption name UCI_Elo value 3190")
    pass

# Convert mate score to centipawn equivalent
def mate_to_centipawn(mate_in_moves):
    sign = 1
    if mate_in_moves <= 0:
        sign = -1
    # return sign* (10000 - 2000 * math.log(abs(mate_in_moves)+1)) # log
    return sign*10_000/(2**abs(mate_in_moves)+1) # linear

# Analyze a position and return the chosen move and score
def analyze_position(engine, board, limit, n_best_moves):
    results = engine.analyse(board, limit, info=chess.engine.INFO_ALL, multipv=n_best_moves)
    valid_results = [res for res in results if res["score"] is not None]
    if not valid_results:
        return None
    
    # Randomly pick the second-best move with 30% probability
    return random.choice(valid_results[:2]) if random.random() <= 0.3 and len(valid_results) > 1 else valid_results[0]

# Gather move information from analysis result
def extract_move_data(board, result):
    move = result["pv"][0]
    score = result["score"].pov(board.turn)
    
    # Determine score, centipawn for normal, centipawn conversion for mate
    if score.is_mate():
        score_value = mate_to_centipawn(score.mate())
    else:
        score_value = score.score() / 100
    
    return {
        "board": board.fen(),
        "move": move.uci(),
        "is_capture": board.is_capture(move),
        "score": score_value,
        "raw_score": str(score),
        "outcome": None,  # Outcome is filled after game ends
        "legal_moves": [move.uci() for move in board.legal_moves],
        "legal_captures": [move.uci() for move in board.generate_legal_captures()],
    }

# Simulate and collect games
def collect_games(engine, max_depth, max_games):
    limit = chess.engine.Limit(depth=max_depth, time=MAX_ANALYSIS_TIME)
    games = []
    
    with tqdm(total=max_games, desc=f"Depth = {max_depth}") as pb_games:
        while len(games) < max_games:
            board = chess.Board()
            moves = []

            # First move is random from Stockfish analysis
            first_move_result = random.choice(engine.analyse(board, limit, info=chess.engine.INFO_ALL, multipv=20))
            first_move_data = extract_move_data(board, first_move_result)
            board.push(first_move_result["pv"][0])
            moves.append(first_move_data)

            # Play out the game
            for turn in range(MAX_MOVES):
                result = analyze_position(engine, board, limit, N_BEST_MOVES)
                if result is None:
                    print("No valid results found for analysis.")
                    break
                
                move_data = extract_move_data(board, result)
                board.push(result["pv"][0])
                moves.append(move_data)

                if board.is_game_over():
                    move_data["outcome"] = board.outcome().termination.name
                    games.append({
                        "moves": moves,
                        "result": move_data["outcome"],
                        "len": len(moves),
                        "depth": max_depth,
                    })
                    pb_games.update(1)
                    break
    
    return games


configure_engine(engine)
while True:
    for depth in range(1,5):
        games = collect_games(engine, depth, MAX_GAMES)
        entries = extract_entries(games,depth, MAX_ANALYSIS_TIME)
        store_data(entries, MAX_MOVES, MAX_ANALYSIS_TIME, depth)