In [7]:
import chess.pgn
import os

def extract_positions_from_pgn(pgn_path, max_games=100):
    games = []
    with open(pgn_path) as pgn:
        for _ in range(max_games):
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            board = game.board()
            for move in game.mainline_moves():
                fen = board.fen()
                board.push(move)
                games.append((fen, move.uci()))
    return games

games = extract_positions_from_pgn("../data/magnus_games.pgn", max_games=500)
games[:3]

[('rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1', 'g1f3'),
 ('rnbqkbnr/pppppppp/8/8/8/5N2/PPPPPPPP/RNBQKB1R b KQkq - 1 1', 'd7d5'),
 ('rnbqkbnr/ppp1pppp/8/3p4/8/5N2/PPPPPPPP/RNBQKB1R w KQkq - 0 2', 'g2g3')]

In [8]:
import chess
import chess.engine
import pandas as pd
import numpy as np

STOCKFISH_PATH = "/opt/homebrew/bin/stockfish"  # adjust if needed

def init_engine():
    return chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH)

def material_balance(board):
    piece_values = {chess.PAWN:1, chess.KNIGHT:3, chess.BISHOP:3, chess.ROOK:5, chess.QUEEN:9}
    balance = 0
    for pt, val in piece_values.items():
        balance += len(board.pieces(pt, chess.WHITE)) * val
        balance -= len(board.pieces(pt, chess.BLACK)) * val
    return balance

def get_features(fen, engine=None):
    board = chess.Board(fen)
    features = {
        "turn": int(board.turn),
        "move_number": board.fullmove_number,
        "material_balance": material_balance(board),
        "castling_rights": int(board.has_kingside_castling_rights(chess.WHITE)) +
                           2 * int(board.has_kingside_castling_rights(chess.BLACK)),
        "mobility": board.legal_moves.count()
    }
    if engine:
        try:
            info = engine.analyse(board, chess.engine.Limit(depth=15))
            features["stockfish_eval"] = info["score"].white().score(mate_score=10000)
        except:
            features["stockfish_eval"] = 0
    else:
        features["stockfish_eval"] = 0
    return features

In [9]:
from tqdm import tqdm

def build_dataset(games, engine=None):
    data = []
    for fen, move in tqdm(games):
        try:
            features = get_features(fen, engine)
            features["move"] = move
            data.append(features)
        except:
            continue
    return pd.DataFrame(data)

engine = init_engine()
df = build_dataset(games, engine)
engine.quit()

df.head()

100%|██████████| 42688/42688 [1:16:23<00:00,  9.31it/s]  


Unnamed: 0,turn,move_number,material_balance,castling_rights,mobility,stockfish_eval,move
0,1,1,0,3,20,39,g1f3
1,0,1,0,3,20,26,d7d5
2,1,2,0,3,22,26,g2g3
3,0,2,0,3,28,27,g8f6
4,1,3,0,3,23,24,f1g2


In [11]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

def cluster_positions(df, n_clusters=10):
    X = df.drop(columns=["move"])
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    pca = PCA(n_components=5)
    reduced = pca.fit_transform(X_scaled)
    kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(reduced)
    return kmeans, reduced, kmeans.labels_

kmeans_model, reduced, cluster_labels = cluster_positions(df)
df["cluster"] = cluster_labels

df["cluster"].value_counts()

cluster
9    9761
6    9738
8    4848
4    4803
2    4800
0    4773
3    1950
1    1227
7     424
5     364
Name: count, dtype: int64

In [26]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, top_k_accuracy_score
import numpy as np

# Filter to top N most frequent moves
N = 200
top_moves = df['move'].value_counts().nlargest(N).index
df_filtered = df[df['move'].isin(top_moves)].copy()

# Define features and label
feature_cols = ['turn', 'move_number', 'material_balance', 'castling_rights',
                'mobility', 'stockfish_eval', 'cluster']
X = df_filtered[feature_cols].copy()
X['turn'] = X['turn'].map({'w': 0, 'b': 1})

# Encode labels
le = LabelEncoder()
y = le.fit_transform(df_filtered['move'])

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train classifier
clf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
clf.fit(X_train, y_train)

# Predict probabilities
y_proba = clf.predict_proba(X_test)

# Only evaluate samples whose labels are in clf.classes_
valid_mask = np.isin(y_test, clf.classes_)
y_test_valid = y_test[valid_mask]
y_proba_valid = y_proba[valid_mask]

# Compute top-k accuracy
print("Top-1 Accuracy:", clf.score(X_test, y_test))
print("Top-3 Accuracy:", top_k_accuracy_score(y_test_valid, y_proba_valid, k=3, labels=clf.classes_))

# Generate classification report
y_pred = clf.predict(X_test)
print("\nClassification Report:")
print(classification_report(
    y_test,
    y_pred,
    labels=clf.classes_,
    target_names=le.inverse_transform(clf.classes_),
    zero_division=0
))

Top-1 Accuracy: 0.10986101919258769
Top-3 Accuracy: 0.2126626957864549

Classification Report:
              precision    recall  f1-score   support

        a1b1       0.14      0.16      0.15        19
        a1c1       0.05      0.06      0.06        16
        a1d1       0.05      0.06      0.06        16
        a2a3       0.00      0.00      0.00        31
        a2a4       0.00      0.00      0.00        35
        a4a3       0.00      0.00      0.00        13
        a4a5       0.10      0.05      0.07        19
        a5a4       0.00      0.00      0.00        21
        a6a5       0.00      0.00      0.00        18
        a7a5       0.00      0.00      0.00        20
        a7a6       0.07      0.08      0.08        37
        a8b8       0.00      0.00      0.00        19
        a8c8       0.00      0.00      0.00        18
        a8d8       0.00      0.00      0.00        20
        b1c3       0.34      0.30      0.32        80
        b1d2       0.24      0.19      0

In [27]:
def predict_magnus_move(example_row):
    example_row = example_row.copy()
    example_row['turn'] = example_row['turn'].map({'w': 0, 'b': 1})
    proba = clf.predict_proba(example_row)[0]
    top_indices = np.argsort(proba)[::-1][:5]
    return [(le.inverse_transform([i])[0], proba[i]) for i in top_indices]

# Try on one sample
predict_magnus_move(X_test.iloc[[0]])

[('d7d5', 0.3988131677404123),
 ('g8f6', 0.32767345150549476),
 ('d7d6', 0.18723281044720674),
 ('c7c5', 0.08628057030688609),
 ('d1d4', 0.0)]