# Preprocessing

In [1]:
# Import necessary libraries
import asyncio
import chess
import chess.engine
import chess.pgn
import json
import numpy as np
import os
import pandas as pd
import random
import shutil
import statistics
import tensorflow as tf
import tf2onnx

from chess import Board
from chess.pgn import Game
from tqdm import tqdm

from tensorflow.keras.layers import Conv2D, Flatten, Dense, Dropout
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping





In [2]:
# Load the dataset
df = pd.read_csv('games.csv')

# Count missing values in each column
missing_values = df.isnull().sum()
print("Missing values in each column:")
print(missing_values)

# Droping rows with missing values
df = df.dropna()

# Display mach count
print(f"Total number of games: {len(df)}")

Missing values in each column:
id                0
rated             0
created_at        0
last_move_at      0
turns             0
victory_status    0
winner            0
increment_code    0
white_id          0
white_rating      0
black_id          0
black_rating      0
moves             0
opening_eco       0
opening_name      0
opening_ply       0
dtype: int64
Total number of games: 20058


In [3]:
# Only keep games with a white rating of at least 1000
df = df[df['white_rating'] >= 1000]
# Displau mach count after filtering
print(f"Number of games with white rating >= 1000: {len(df)}")

Number of games with white rating >= 1000: 19811


In [4]:
# Converting the Lichess-style millisecond timestamp into 'YYYY.MM.DD'
df['Date'] = pd.to_datetime(df['created_at'], unit='ms').dt.strftime('%Y.%m.%d')

# Function to convert a DataFrame row into a PGN-formatted string
def row_to_pgn(row):
    game = chess.pgn.Game()

    # --- Headers ---
    game.headers["Event"]   = row["id"]
    game.headers["Site"]    = "https://lichess.org"
    game.headers["Date"]    = row["Date"]
    game.headers["Round"]   = "?"
    game.headers["White"]   = f"{row['white_id']} ({int(row['white_rating'])})"
    game.headers["Black"]   = f"{row['black_id']} ({int(row['black_rating'])})"
    game.headers["Result"]  = row["winner"] if pd.notna(row["winner"]) else "*"
    game.headers["ECO"]     = row.get("opening_eco", "")
    game.headers["Opening"] = row.get("opening_name", "")

    # --- Moves ---
    board = game.board()
    node  = game
    # Split the moves by space and parse each SAN move
    # The moves are expected to be in Standard Algebraic Notation (SAN)
    for san in row["moves"].split():
        move = board.parse_san(san)
        node = node.add_variation(move)
        board.push(move)

    # --- Export PGN ---
    exporter = chess.pgn.StringExporter(headers=True, variations=False, comments=False)
    return game.accept(exporter)

# Write all games into one PGN file
output_path = "games.pgn"
with open(output_path, "w", encoding="utf-8") as out:
    for _, row in df.iterrows():
        out.write(row_to_pgn(row))
        out.write("\n\n")


print(f"✔️ Saved {len(df)} games to '{output_path}'")

✔️ Saved 19811 games to 'games.pgn'


In [5]:
# Mapping moves to integers
def build_full_move_mapping(pgn_path):
    moves = set()
    with open(pgn_path) as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break
            board = game.board()
            for mv in game.mainline_moves():
                board.push(mv)
                moves.add(mv.uci())
    sorted_moves = sorted(moves)
    int_to_move = {i:uci for i, uci in enumerate(sorted_moves)}
    move_to_int = {uci:i for i, uci in int_to_move.items()}
    return move_to_int, int_to_move

move_to_int, int_to_move = build_full_move_mapping("games.pgn")

print(f"Found {len(int_to_move)} unique moves (including castle & promotions).")
# Save to JSON for later reuse:
with open("full_move_mapping_1000.json","w") as f:
    # JSON keys must be strings
    json.dump({str(i): m for i,m in int_to_move.items()}, f)

Found 1884 unique moves (including castle & promotions).


In [6]:
# Load the dataset

# Open the PGN file
pgn_path = "games.pgn"
pgn = open(pgn_path, encoding="utf-8")

# Parse all games into a list
games = []
while True:
    game = chess.pgn.read_game(pgn)
    if game is None:
        break
    games.append(game)
pgn.close()

print(f"Loaded {len(games)} games.")

Loaded 19811 games.


In [7]:
# Matrix representation of the board
def board_to_matrix(board: Board):
    # 8x8 for the board, 12 for the piece types (6 white + 6 black)
    # Initialize a 3D numpy array with zeros
    matrix = np.zeros((8, 8, 12))
    piece_map = board.piece_map()
    for square, piece in piece_map.items():
        # Convert square index to row and column
        row, col = divmod(square, 8)
        # Map piece type and color to the matrix
        piece_type = piece.piece_type - 1
        # White pieces are 0-5, black pieces are 6-11
        piece_color = 0 if piece.color else 6
        matrix[row, col, piece_type + piece_color] = 1
    return matrix

#
def create_input_with_phase(games):
    """
    Returns three parallel lists:
      X      = list of board_to_matrix(board) at each ply
      y      = list of UCI strings of the moves played
      phases = list of 1-based ply numbers
    """
    # Initialize lists to hold the data
    X, y, phases = [], [], []
    for game in games:
        board = game.board()
        for ply, move in enumerate(game.mainline_moves(), start=1):
            X.append(board_to_matrix(board))
            y.append(move.uci())
            phases.append(ply)
            board.push(move)
    return X, y, phases


# Create input for neural network training
def create_input_for_nn(games):
    X = []
    y = []
    for game in games:
        board = game.board()
        for move in game.mainline_moves():
            X.append(board_to_matrix(board))
            y.append(move.uci())
            board.push(move)
    return X, y

# Load the mapping
with open("full_move_mapping_1000.json") as f:
    raw = json.load(f)
int_to_move = {int(k):v for k,v in raw.items()}
move_to_int = {v:k for k,v in int_to_move.items()}

# Number of moves in policy head
N_moves = len(int_to_move)



In [8]:
# Load positions + ply counts
X_all, y_all, phases = create_input_with_phase(games)

# Bucket by phase
buckets = {'opening': [], 'middlegame': [], 'endgame': []}
for board_mat, uci, ply in zip(X_all, y_all, phases):
    if   ply <= 20:
        buckets['opening'].append((board_mat, uci))
    elif ply <= 60:
        buckets['middlegame'].append((board_mat, uci))
    else:
        buckets['endgame'].append((board_mat, uci))

# Oversample endgames to 25% of total
total      = len(X_all)
target_end = int(total * 0.25)
combined   = (
    buckets['opening']
  + buckets['middlegame']
  + random.choices(buckets['endgame'], k=target_end)
)

# Shuffle & unzip back into X_bal, y_bal
random.shuffle(combined)
X_bal, y_bal = zip(*combined)
X_bal, y_bal = list(X_bal), list(y_bal)

# One-hot encode balanced labels
y_idxs  = [move_to_int[uci] for uci in y_bal]
y_train = to_categorical(y_idxs, num_classes=N_moves)

# Convert features to NumPy
X_train = np.array(X_bal, dtype=np.float32)


# Experiments

In [9]:
# Define the EarlyStopping callback
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=3,
    min_delta=0.001,
    restore_best_weights=True
)

In [10]:

model = Sequential([
    Conv2D(64, (3, 3), activation='relu', input_shape=(8, 8, 12)),
    Dropout(0.3),
    Conv2D(128, (3, 3), activation='relu'),
    Dropout(0.3),
    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.3),
    Dense(N_moves, activation='softmax')
])
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()
model.fit(
    X_train,
    y_train,
    epochs=20,
    validation_split=0.1,
    batch_size=64,
    callbacks=[early_stop]
)
model.save("models/TF_20EPOCHS_WR1000_map.keras")

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m84s[0m 5ms/step - accuracy: 0.0813 - loss: 5.6994 - val_accuracy: 0.1523 - val_loss: 4.5277
Epoch 2/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 5ms/step - accuracy: 0.1304 - loss: 4.7741 - val_accuracy: 0.1635 - val_loss: 4.2733
Epoch 3/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 5ms/step - accuracy: 0.1392 - loss: 4.5974 - val_accuracy: 0.1701 - val_loss: 4.1577
Epoch 4/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 5ms/step - accuracy: 0.1432 - loss: 4.5143 - val_accuracy: 0.1706 - val_loss: 4.0822
Epoch 5/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 5ms/step - accuracy: 0.1454 - loss: 4.4621 - val_accuracy: 0.1731 - val_loss: 4.0453
Epoch 6/20
[1m17410/17410[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 5ms/step - accuracy: 0.1473 - loss: 4.4199 - val_accuracy: 0.1733 - val_loss: 4.017

In [11]:
int_to_move = dict(zip(move_to_int.values(), move_to_int.keys()))

def predict_next_move(board):
    board_matrix = board_to_matrix(board).reshape(1, 8, 8, 12)
    predictions = model.predict(board_matrix)[0]
    legal_moves = list(board.legal_moves)
    legal_moves_uci = [move.uci() for move in legal_moves]
    sorted_indices = np.argsort(predictions)[::-1]
    for move_index in sorted_indices:
        move = int_to_move[move_index]
        if move in legal_moves_uci:
            return move
    return None

In [12]:
board = Board()

In [13]:

# Display the board before prediction
print("Board before prediction:")
print(board)

# Predict and make the move
next_move = predict_next_move(board)
board.push_uci(next_move)

# Display the board after prediction
print("\nPredicted move:", next_move)
print("Board after prediction:")
print(board)


Board before prediction:
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
P P P P P P P P
R N B Q K B N R
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step

Predicted move: e2e4
Board after prediction:
r n b q k b n r
p p p p p p p p
. . . . . . . .
. . . . . . . .
. . . . P . . .
. . . . . . . .
P P P P . P P P
R N B Q K B N R


In [14]:
print(str(Game.from_board(board)))

game = chess.pgn.Game()

[Event "?"]
[Site "?"]
[Date "????.??.??"]
[Round "?"]
[White "?"]
[Black "?"]
[Result "*"]

1. e4 *


In [15]:
# Load CSV
df = pd.read_csv('games.csv')
df.count()

# Convert the Lichess-style millisecond timestamp into 'YYYY.MM.DD'
df['Date'] = pd.to_datetime(df['created_at'], unit='ms').dt.strftime('%Y.%m.%d')

def row_to_pgn(row):
    """Turn a DataFrame row into a PGN-formatted string."""
    game = chess.pgn.Game()

    # --- Headers ---
    game.headers["Event"]   = row["id"]
    game.headers["Site"]    = "https://lichess.org"
    game.headers["Date"]    = row["Date"]
    game.headers["Round"]   = "?"
    game.headers["White"]   = f"{row['white_id']} ({int(row['white_rating'])})"
    game.headers["Black"]   = f"{row['black_id']} ({int(row['black_rating'])})"
    game.headers["Result"]  = row["winner"] if pd.notna(row["winner"]) else "*"
    game.headers["ECO"]     = row.get("opening_eco", "")
    game.headers["Opening"] = row.get("opening_name", "")

    # --- Moves ---
    board = game.board()
    node  = game
    for san in row["moves"].split():
        move = board.parse_san(san)
        node = node.add_variation(move)
        board.push(move)

    # --- Export PGN ---
    exporter = chess.pgn.StringExporter(headers=True, variations=False, comments=False)
    return game.accept(exporter)

# Write all games into one PGN file
output_path = "games.pgn"
with open(output_path, "w", encoding="utf-8") as out:
    for _, row in df.iterrows():
        out.write(row_to_pgn(row))
        out.write("\n\n")

print(f"✔️ Saved {len(df)} games to '{output_path}'")


# Open the PGN file
pgn_path = "games.pgn"
pgn = open(pgn_path, encoding="utf-8")

# Parse all games into a list
games = []
while True:
    game = chess.pgn.read_game(pgn)
    if game is None:
        break
    games.append(game)
pgn.close()

print(f"Loaded {len(games)} games.")

✔️ Saved 20058 games to 'games.pgn'
Loaded 20058 games.


In [16]:
# --- CONFIG ----------------------------------------------------------------
STOCKFISH_PATH = r"C:\BPR\Attempt2\stockfishe\stockfish-windows-x86-64-avx2.exe"
MAPPING_PATH   = "full_move_mapping_1000.json"       # serialized mapping
MAX_SAMPLES    = 50000      # max FENs to evaluate
BATCH_SIZE     = 512        # batch size for model predictions
# --- Windows + Jupyter fix for subprocesses --------------------------------
if hasattr(asyncio, "WindowsProactorEventLoopPolicy"):
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

# ---  Load trained model ---------------------------------------------------
model = load_model("models/TF_20EPOCHS_WR1000_map.keras")

# ---  Load the canonical move mapping from JSON ----------------------------
with open("full_move_mapping_1000.json") as f:
    raw = json.load(f)               # keys were dumped as strings
# convert keys back to ints
int_to_move = {int(k):v for k,v in raw.items()}
move_to_int = {v: k for k, v in int_to_move.items()}

# sanity-check: model outputs must match mapping size
n_out = model.output_shape[-1]
n_map = len(int_to_move)
assert n_out == n_map, (
    f"⚠️  Mismatch: model has {n_out} outputs but mapping has {n_map} moves."
)
print(f"✅ Loaded mapping with {n_map} moves → matches model outputs.")

# --- Setup Stockfish -------------------------------------------------------
engine = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH)

# ---  Load & sample your test positions ------------------------------------
test_fens = []
with open("games.pgn") as pgn:
    while True:
        game = chess.pgn.read_game(pgn)
        if game is None:
            break
        board = game.board()
        for move in game.mainline_moves():
            board.push(move)
            if board.fullmove_number >= 10 and board.fullmove_number % 5 == 0:
                test_fens.append(board.fen())

print(f"Loaded {len(test_fens)} total positions from your PGN.")

if len(test_fens) > MAX_SAMPLES:
    test_fens = random.sample(test_fens, MAX_SAMPLES)
    print(f"Down-sampled to {len(test_fens)} positions for this run.")

test_fens = [fen for fen in test_fens if not chess.Board(fen).is_game_over()]
print(f"{len(test_fens)} non-terminal positions remain after filtering.")

# ---  Batch model predictions ----------------------------------------------
all_X = np.stack([board_to_matrix(chess.Board(fen)) for fen in test_fens])
print(f"Running model.predict on {len(test_fens)} positions…")
all_probs = model.predict(all_X, batch_size=BATCH_SIZE, verbose=0)

# ---  Stockfish agreement loop with a single tqdm bar ----------------------
def evaluate_agreement(fen_list, probs_list, depth=3, top_n=1):
    correct = total = 0

    for fen, probs in tqdm(zip(fen_list, probs_list),
                           total=len(fen_list),
                           desc=f"Depth={depth} Top-{top_n}",
                           unit="pos",
                           dynamic_ncols=True,
                           leave=True):
        board = chess.Board(fen)

        # Stockfish best move
        result = engine.play(board, chess.engine.Limit(depth=depth))
        if result.move is None:
            continue
        sf_move = result.move.uci()

        # Our model’s top-n moves (only those in mapping)
        ranked = np.argsort(probs)[::-1]
        top_moves = [int_to_move[i] for i in ranked[:top_n]]

        if sf_move in top_moves:
            correct += 1
        total += 1

    if total == 0:
        print("No positions evaluated!")
        return 0.0

    acc = correct / total
    print(f"\n=> Evaluated {total} positions.")
    print(f"=> Top-{top_n} agreement at depth {depth}: "
          f"{correct}/{total} = {acc*100:.2f}%")
    return acc

# ---  Run both Top-1 and Top-3 tests ---------------------------------------
if __name__ == "__main__":
    evaluate_agreement(test_fens, all_probs, depth=3, top_n=1)
    evaluate_agreement(test_fens, all_probs, depth=3, top_n=3)
    engine.quit()


✅ Loaded mapping with 1884 moves → matches model outputs.
Loaded 191573 total positions from your PGN.
Down-sampled to 50000 positions for this run.
49634 non-terminal positions remain after filtering.
Running model.predict on 49634 positions…


Depth=3 Top-1: 100%|██████████| 49634/49634 [00:27<00:00, 1810.48pos/s]



=> Evaluated 49634 positions.
=> Top-1 agreement at depth 3: 4934/49634 = 9.94%


Depth=3 Top-3: 100%|██████████| 49634/49634 [00:26<00:00, 1884.97pos/s]


=> Evaluated 49634 positions.
=> Top-3 agreement at depth 3: 9974/49634 = 20.10%





In [17]:
engine = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH)
# Sanity‐check A: SF moves vs. move index map
missing = set()
for fen in random.sample(test_fens, min(500, len(test_fens))):
    board = chess.Board(fen)
    result = engine.play(board, chess.engine.Limit(depth=1))
    sf = result.move
    if sf is None:
        continue
    sf_uci = sf.uci()
    # If Stockfish’s move isn’t one of the moves your model knows about:
    if sf_uci not in int_to_move.values():
        missing.add(sf_uci)
if missing:
    print("⚠️  These SF moves are NOT in int_to_move:\n", missing)
else:
    print("✔️  All sampled SF moves map to your model’s move set.")


✔️  All sampled SF moves map to your model’s move set.


In [18]:
hits = []
for fen in random.sample(test_fens, min(1000, len(test_fens))):
    board = chess.Board(fen)
    sf_res = engine.play(board, chess.engine.Limit(depth=1))
    if sf_res.move is None:
        continue
    sf_move = sf_res.move

    legal = list(board.legal_moves)
    if len(legal) < 3:
        continue

    picks = random.sample(legal, 3)
    hits.append(int(sf_move in picks))

print(f"Random‐legal Top-3 baseline: {statistics.mean(hits)*100:.2f}%")


Random‐legal Top-3 baseline: 16.56%


#  Deployment

In [19]:
# Paths
KERAS_MODEL_PATH = "models/TF_20EPOCHS_WR1000_map.keras"
EXPORT_DIR       = "export"
ONNX_PATH        = os.path.join(EXPORT_DIR, "policy_fullmoves.onnx")
MAPPING_SRC      = "full_move_mapping_1000.json"
MAPPING_DST      = os.path.join(EXPORT_DIR, "move_mapping.json")

os.makedirs(EXPORT_DIR, exist_ok=True)

# Load Keras model
model = tf.keras.models.load_model(KERAS_MODEL_PATH)
print("Loaded Keras model with output shape", model.output_shape)

# Wrap it in a tf.function with explicit input_signature
# This tells tf2onnx what the input shape and dtype are.
input_signature = [
    tf.TensorSpec([None, 8, 8, 12], tf.float32, name="input")
]
@tf.function(input_signature=input_signature)
def model_fn(x):
    # returns Tensor of shape [None, N_moves]
    return model(x)

# Convert to ONNX via from_function (now with input_signature)
print("→ Converting to ONNX…")
model_proto, _ = tf2onnx.convert.from_function(
    model_fn,
    input_signature=input_signature,
    opset=13,
    output_path=ONNX_PATH
)
print(f"✅ ONNX model saved to {ONNX_PATH}")

# Copy your move-mapping JSON next to the ONNX
shutil.copyfile(MAPPING_SRC, MAPPING_DST)
print(f"✅ Mapping JSON copied to {MAPPING_DST}")


Loaded Keras model with output shape (None, 1884)
→ Converting to ONNX…


rewriter <function rewrite_constant_fold at 0x000002435CCE5BC0>: exception `np.cast` was removed in the NumPy 2.0 release. Use `np.asarray(arr, dtype=dtype)` instead.


✅ ONNX model saved to export\policy_fullmoves.onnx
✅ Mapping JSON copied to export\move_mapping.json
