# CHESS OUTCOME PREDICTION — PRE-PROCESSING

**Goal:** prepare a clean, sequence-only dataset for deep learning.

**Cut-off:** first 20 full moves (40 plies).

**Input:** `data/chess_games_subset.csv` (uses `AN` for moves, `Result` for labels)  
**Output:** `data/chess_games_clean.csv`, `data/chess_games_clean_meta.json`

**Process:** load subset → tokenise `AN` → strip move numbers/result tokens → keep first 40 plies → add `plies_processed`, `cutoff_reached`, `captures_in_first_20_moves`, `checks_in_first_20_moves` → leakage checks → save.

In [57]:
# ==============================================
# 1. Imports & Paths
# ==============================================
from pathlib import Path
import json
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 200)

# ----------------------------------------------
# 1.1 Ensure python-chess
# ----------------------------------------------
try:
    import chess
except ModuleNotFoundError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "python-chess", "--quiet"])
    import chess
print("python-chess OK.")

# ----------------------------------------------
# 1.2 Config (sequence only)
# ----------------------------------------------
CUTOFF_FULL_MOVES = 30
CUTOFF_PLIES = CUTOFF_FULL_MOVES * 2
MOVE_COL = f"moves_first{CUTOFF_FULL_MOVES}_san"
CAPTURES_COL = f"captures_in_first_{CUTOFF_FULL_MOVES}_moves"
CHECKS_COL = f"checks_in_first_{CUTOFF_FULL_MOVES}_moves"
SEQUENCE_ONLY = True
INCLUDE_ELO   = True

# ----------------------------------------------
# 1.3 Paths
# ----------------------------------------------
NB_DIR   = Path.cwd()
DATA_DIR = (NB_DIR / "../data").resolve()
OUT_DIR  = DATA_DIR
OUT_DIR.mkdir(parents=True, exist_ok=True)

RAW_SUBSET = DATA_DIR / "chess_games_subset.csv"
CLEAN_CSV  = DATA_DIR / "chess_games_clean.csv"
META_JSON  = DATA_DIR / "chess_games_clean_meta.json"

CHECKPOINT_PLIES = [20, 40, 60]         
BOARDS_NPZ = DATA_DIR / "chess_boards_8x8xC.npz"

print("Data dir:", DATA_DIR)
print("Input :", RAW_SUBSET)
print("Output:", CLEAN_CSV)
print(f"Cutoff: {CUTOFF_FULL_MOVES} moves ({CUTOFF_PLIES} plies)")

python-chess OK.
Data dir: E:\Github Projects\chess-outcome-prediction\data
Input : E:\Github Projects\chess-outcome-prediction\data\chess_games_subset.csv
Output: E:\Github Projects\chess-outcome-prediction\data\chess_games_clean.csv
Cutoff: 30 moves (60 plies)


In [58]:
# ==============================================
# 2. Load Subset
# ==============================================
df_raw = pd.read_csv(RAW_SUBSET)

# ----------------------------------------------
# 2.1 Detect Columns
# ----------------------------------------------
cols = {c.lower(): c for c in df_raw.columns}

an_col = None
for k in ["an", "moves", "algebraic", "algebraicnotation"]:
    if k in cols:
        an_col = cols[k]
        break
if an_col is None:
    raise ValueError("Moves column (AN) not found. Expected a column like 'AN' or 'Moves'.")

label_col = None
for k in ["result", "target", "winner", "outcome"]:
    if k in cols:
        label_col = cols[k]
        break
if label_col is None:
    raise ValueError("Label column not found. Expected 'Result' or similar.")

# ----------------------------------------------
# 2.2 Clean Labels
# ----------------------------------------------
def map_label(v):
    s = str(v).strip().lower()
    if s in {"1-0", "white", "w"}:
        return "white"
    if s in {"0-1", "black", "b"}:
        return "black"
    if s in {"1/2-1/2", "draw", "d"}:
        return "draw"
    return np.nan

df = pd.DataFrame(
    {
        "moves_raw": df_raw[an_col].astype(str),
        "target": df_raw[label_col].map(map_label),
    }
).dropna(subset=["moves_raw", "target"]).reset_index(drop=True)

print(df.shape)
df.head(3)

(142303, 2)


Unnamed: 0,moves_raw,target
0,1. f4 d5 2. g3 c5 3. Bg2 Nc6 4. Nf3 Bf5 5. O-O...,white
1,1. e4 b6 2. d4 Bb7 3. Bd3 e6 4. f4 d6 5. Nf3 h...,white
2,1. d4 Nf6 2. c4 e6 3. Nc3 c5 4. d5 d6 5. e3 e5...,black


In [59]:
# ==============================================
# 3. Extract First 20 Moves (40 plies)
# ==============================================

# ----------------------------------------------
# 3.1 Helpers
# ----------------------------------------------
RESULT_TOKENS = {"1-0", "0-1", "1/2-1/2"}

def tokenize_an(s: str):
    toks = str(s).replace("\r", " ").replace("\n", " ").split()
    keep = []
    for t in toks:
        if t.endswith(".") and t[:-1].isdigit():
            continue
        if t == "...":
            continue
        if t in RESULT_TOKENS:
            continue
        keep.append(t)
    return keep

def count_captures(seq): 
    return sum("x" in m for m in seq)

def count_checks(seq): 
    return sum(("+" in m) or ("#" in m) for m in seq)

# --- Board helpers (append) ---
def _board_planes(b):
    # 12 planes: white/black {P,N,B,R,Q,K} → (8,8,12) uint8
    planes = []
    for color, pieces in [(chess.WHITE, [chess.PAWN, chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN, chess.KING]),
                          (chess.BLACK, [chess.PAWN, chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN, chess.KING])]:
        for p in pieces:
            plane = np.zeros((8, 8), dtype=np.uint8)
            for sq in b.pieces(p, color):
                r = 7 - (sq // 8); c = sq % 8
                plane[r, c] = 1
            planes.append(plane)
    return np.stack(planes, axis=-1)  # (8,8,12)

def boards_from_san_list(san_list, checkpoints=CHECKPOINT_PLIES):
    b = chess.Board()
    out = []
    want = list(checkpoints)
    ply = 0
    for m in san_list:
        try:
            b.push_san(m)
        except Exception:
            break
        ply += 1
        while want and ply >= want[0]:
            out.append(_board_planes(b))
            want.pop(0)
    while want:  # pad with empty boards if short
        out.append(np.zeros((8, 8, 12), dtype=np.uint8))
        want.pop(0)
    return np.concatenate(out, axis=-1)  # (8,8, 12*len(checkpoints))

# ----------------------------------------------
# 3.2 Build Sequence Columns
# ----------------------------------------------
tokens = df["moves_raw"].apply(tokenize_an)
tokens_cut = tokens.apply(lambda t: t[:CUTOFF_PLIES])

df[MOVE_COL] = tokens_cut.apply(lambda t: " ".join(t))
df["plies_processed"] = tokens_cut.apply(len)
df["cutoff_reached"] = (df["plies_processed"] == CUTOFF_PLIES).astype(int)
df[CAPTURES_COL] = tokens_cut.apply(count_captures)
df[CHECKS_COL] = tokens_cut.apply(count_checks)

df = df.dropna(subset=[MOVE_COL, "target"]).reset_index(drop=True)

print(df.shape)
df[[MOVE_COL, "plies_processed", "target"]].head(3)

(142303, 7)


Unnamed: 0,moves_first30_san,plies_processed,target
0,f4 d5 g3 c5 Bg2 Nc6 Nf3 Bf5 O-O e6 d3 g6 Be3 h...,33,white
1,e4 b6 d4 Bb7 Bd3 e6 f4 d6 Nf3 h6 O-O a6 Qe2 Ne...,28,white
2,d4 Nf6 c4 e6 Nc3 c5 d5 d6 e3 e5 Nf3 Be7 Bd3 Nb...,60,black


In [60]:
# ==============================================
# 4. Assemble Working Frame (sequence-only, no leakage)
# ==============================================

# ----------------------------------------------
# 4.1 Select Columns
# ----------------------------------------------
keep_cols = [MOVE_COL, "plies_processed", "cutoff_reached", CAPTURES_COL, CHECKS_COL, "target"]
missing = [c for c in keep_cols if c not in df.columns]
if missing:
    raise ValueError(f"Missing expected columns: {missing}")

X = df[keep_cols].copy()

# ----------------------------------------------
# 4.2 Leakage Guard
# ----------------------------------------------
banned = {
    "Result","result","winner","Winner","termination","Termination",
    "num_moves","NumMoves","Opening","opening","ECO","eco","Moves","AN",
    "UTCDate","UTCTime","Event","TimeControl","White","Black",
    "WhiteElo","BlackElo","white_elo","black_elo",
    "WhiteRating","BlackRating","WhiteRatingDiff","BlackRatingDiff"
}
present_banned = [c for c in X.columns if c in banned]
assert not present_banned, f"Leaky columns present in X: {present_banned}"

# ----------------------------------------------
# 4.3 Types
# ----------------------------------------------
X[MOVE_COL] = X[MOVE_COL].fillna("").astype(str)
X["target"] = X["target"].astype(str)
for c in ["plies_processed", "cutoff_reached", CAPTURES_COL, CHECKS_COL]:
    X[c] = pd.to_numeric(X[c], errors="coerce")

# ----------------------------------------------
# 4.4 Elo Features (pre-game, derived only; read from df_raw)
# ----------------------------------------------
# robust selection (case-insensitive; supports Elo or Rating; ignores *Diff)
lower_map = {c.lower(): c for c in df_raw.columns}

def pick_rating(side: str):
    side_l = side.lower()
    prefs = [
        f"{side_l}elo", f"{side_l}_elo",
        f"{side_l}rating", f"{side_l}_rating",
    ]
    for p in prefs:
        if p in lower_map:
            return lower_map[p]
    cands = [c for c in df_raw.columns
             if (side_l in c.lower()) and (("elo" in c.lower()) or ("rating" in c.lower()))]
    return cands[0] if cands else None

w_col = pick_rating("White")
b_col = pick_rating("Black")

if w_col and b_col:
    # Explicit, side-aware columns kept in the CLEAN set
    X["white_elo"] = pd.to_numeric(df_raw[w_col], errors="coerce")
    X["black_elo"] = pd.to_numeric(df_raw[b_col], errors="coerce")

    # Directional diff (white − black) and average derived from the above
    X["elo_diff"] = (X["white_elo"] - X["black_elo"])
    X["elo_avg"]  = (X["white_elo"] + X["black_elo"]) / 2.0

    # Coerce types (plays nicely with your Section 5 downcast)
    X["elo_diff"] = pd.to_numeric(X["elo_diff"], errors="coerce")
    X["elo_avg"]  = pd.to_numeric(X["elo_avg"],  errors="coerce")

    print(f"Elo detected → White: '{w_col}', Black: '{b_col}'. "
          f"Added white_elo/black_elo + elo_diff/elo_avg.")
else:
    print("No Elo columns detected in df_raw; skipping Elo features.")

# ----------------------------------------------
# 4.5 Filter: require the game to reach the cutoff
# ----------------------------------------------
REQUIRE_CUTOFF = True  # set False if you want to keep everything

if REQUIRE_CUTOFF:
    before = len(X)
    X = X[X["plies_processed"] >= CUTOFF_PLIES].reset_index(drop=True)
    kept = len(X)
    print(f"Kept games reaching {CUTOFF_PLIES} plies: {kept}/{before} ({kept/before:.1%})")
else:
    print("Not filtering by cutoff (short games included).")

Elo detected → White: 'WhiteElo', Black: 'BlackElo'. Added white_elo/black_elo + elo_diff/elo_avg.
Kept games reaching 60 plies: 89061/142303 (62.6%)


In [61]:
# ==============================================
# 4.6 Binary Task (drop draws)
# ==============================================

BINARY_TASK = True

if BINARY_TASK:
    before = len(X)
    X = X[X["target"].isin(["white", "black"])].reset_index(drop=True)
    kept = len(X)
    print(f"Kept non-draws: {kept}/{before} ({kept/before:.1%})")
else:
    print("Keeping draws (3-class task).")

Kept non-draws: 83944/89061 (94.3%)


In [62]:
# ==============================================
# 5. Impute & Downcast (sequence-only)
# ==============================================
import numpy as np

def mem_mb(df):
    return df.memory_usage(deep=True).sum() / (1024**2)

print(f"Memory before: {mem_mb(X):.2f} MB")

# ----------------------------------------------
# 5.1 Impute Numerics
# ----------------------------------------------
num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
X.replace([np.inf, -np.inf], np.nan, inplace=True)
for c in num_cols:
    med = X[c].median()
    X[c] = X[c].fillna(med)

# ----------------------------------------------
# 5.2 Downcast Numerics
# ----------------------------------------------
for c in num_cols:
    if pd.api.types.is_integer_dtype(X[c]):
        X[c] = pd.to_numeric(X[c], downcast="integer")
    elif pd.api.types.is_float_dtype(X[c]):
        X[c] = pd.to_numeric(X[c], downcast="float")

# ----------------------------------------------
# 5.3 Preserve Sequence & Target Types
# ----------------------------------------------
X[MOVE_COL] = X[MOVE_COL].fillna("").astype(str)
X["target"] = X["target"].astype(str)

print(f"Memory after: {mem_mb(X):.2f} MB")
display(X.dtypes.to_frame("dtype").T)
display(X.head(3))

Memory before: 34.15 MB
Memory after: 30.47 MB


Unnamed: 0,moves_first30_san,plies_processed,cutoff_reached,captures_in_first_30_moves,checks_in_first_30_moves,target,white_elo,black_elo,elo_diff,elo_avg
dtype,object,int8,int8,int8,int8,object,int16,int16,int16,float32


Unnamed: 0,moves_first30_san,plies_processed,cutoff_reached,captures_in_first_30_moves,checks_in_first_30_moves,target,white_elo,black_elo,elo_diff,elo_avg
0,d4 Nf6 c4 e6 Nc3 c5 d5 d6 e3 e5 Nf3 Be7 Bd3 Nb...,60,1,13,3,black,2193,1782,411,1987.5
1,e4 e5 Nf3 Nc6 d4 Nxd4 Nxe5 Qe7 f4 d6 Qxd4 dxe5...,60,1,17,3,black,1548,1632,-84,1590.0
2,d4 d5 c4 Nf6 Nc3 c6 Bg5 e6 Nf3 Be7 e3 O-O c5 h...,60,1,12,2,black,1604,1918,-314,1761.0


In [63]:
# ==============================================
# 6. Sanity Checks (leakage / balance / NAs / sequence)
# ==============================================

# ----------------------------------------------
# 6.1 Leakage Guard
# ----------------------------------------------
banned = {
    "Result","result","winner","Winner","termination","Termination",
    "num_moves","NumMoves","Opening","opening","ECO","eco","Moves","AN",
    "UTCDate","UTCTime","Event","TimeControl","White","Black",
    "WhiteElo","BlackElo","WhiteRatingDiff","BlackRatingDiff"
}
present_banned = [c for c in X.columns if c in banned]
assert not present_banned, f"Leaky columns present: {present_banned}"
assert MOVE_COL in X.columns

# ----------------------------------------------
# 6.2 Class Balance
# ----------------------------------------------
print("Target distribution (counts):")
display(X["target"].value_counts())
print("Target distribution (percent):")
display((X["target"].value_counts(normalize=True) * 100).round(2))

# ----------------------------------------------
# 6.3 NA Audit
# ----------------------------------------------
na_counts = X.isna().sum()
na_counts = na_counts[na_counts > 0]
if not na_counts.empty:
    print("Columns with missing values after imputation:")
    display(na_counts)
else:
    print("No missing values remaining")

# ----------------------------------------------
# 6.4 Sequence Checks
# ----------------------------------------------
total = len(X)
empty_seq = (X[MOVE_COL].str.len() == 0).sum()
at_cutoff = (X["plies_processed"] == CUTOFF_PLIES).sum()
over_cutoff = (X["plies_processed"] > CUTOFF_PLIES).sum()

print(f"Empty sequences: {empty_seq}/{total}")
print(f"At cutoff ({CUTOFF_PLIES} plies): {at_cutoff}/{total}")
print(f"Over cutoff (> {CUTOFF_PLIES}): {over_cutoff}")

display(X[[MOVE_COL, "plies_processed", "target"]].head(3))

Target distribution (counts):


target
white    42008
black    41936
Name: count, dtype: int64

Target distribution (percent):


target
white    50.04
black    49.96
Name: proportion, dtype: float64

No missing values remaining
Empty sequences: 0/83944
At cutoff (60 plies): 83944/83944
Over cutoff (> 60): 0


Unnamed: 0,moves_first30_san,plies_processed,target
0,d4 Nf6 c4 e6 Nc3 c5 d5 d6 e3 e5 Nf3 Be7 Bd3 Nb...,60,black
1,e4 e5 Nf3 Nc6 d4 Nxd4 Nxe5 Qe7 f4 d6 Qxd4 dxe5...,60,black
2,d4 d5 c4 Nf6 Nc3 c6 Bg5 e6 Nf3 Be7 e3 O-O c5 h...,60,black


In [64]:
# ==============================================
# 7. Save Cleaned Data & Metadata
# ==============================================

# ----------------------------------------------
# 7.1 Write CSV
# ----------------------------------------------
X.to_csv(CLEAN_CSV, index=False)
print("Saved CSV:", CLEAN_CSV)

# Split SAN string into tokens and encode boards at CHECKPOINT_PLIES
def _san_tokens(s): 
    return str(s).split()  # MOVE_COL already cleaned earlier

boards = np.stack([boards_from_san_list(_san_tokens(s)) for s in X[MOVE_COL].tolist()], axis=0)
np.savez_compressed(BOARDS_NPZ, boards=boards)
print("Saved boards:", BOARDS_NPZ, "shape:", boards.shape, "dtype:", boards.dtype)

# ----------------------------------------------
# 7.2 Write Metadata
# ----------------------------------------------
meta = {
    "cutoff_full_moves": int(CUTOFF_FULL_MOVES),
    "cutoff_plies": int(CUTOFF_PLIES),
    "rows": int(len(X)),
    "columns": list(X.columns),
    "move_col": MOVE_COL,
    "captures_col": CAPTURES_COL,
    "checks_col": CHECKS_COL,
    "class_distribution": X["target"].value_counts().to_dict(),
    "boards_npz": str(BOARDS_NPZ),
    "board_channels": int(boards.shape[-1]),
    "board_checkpoints": list(map(int, CHECKPOINT_PLIES)),
}

import json
with open(META_JSON, "w", encoding="utf-8") as f:
    json.dump(meta, f, indent=2)
print("Saved meta:", META_JSON)

# ----------------------------------------------
# 7.3 Quick Preview
# ----------------------------------------------
print(X.shape)
display(X.head(3))
assert MOVE_COL in X.columns
assert X["plies_processed"].max() <= CUTOFF_PLIES
print(MOVE_COL, "→ fixed length:", CUTOFF_PLIES)
display(X[[MOVE_COL, "plies_processed", "target"]].head(3))

Saved CSV: E:\Github Projects\chess-outcome-prediction\data\chess_games_clean.csv
Saved boards: E:\Github Projects\chess-outcome-prediction\data\chess_boards_8x8xC.npz shape: (83944, 8, 8, 36) dtype: uint8
Saved meta: E:\Github Projects\chess-outcome-prediction\data\chess_games_clean_meta.json
(83944, 10)


Unnamed: 0,moves_first30_san,plies_processed,cutoff_reached,captures_in_first_30_moves,checks_in_first_30_moves,target,white_elo,black_elo,elo_diff,elo_avg
0,d4 Nf6 c4 e6 Nc3 c5 d5 d6 e3 e5 Nf3 Be7 Bd3 Nb...,60,1,13,3,black,2193,1782,411,1987.5
1,e4 e5 Nf3 Nc6 d4 Nxd4 Nxe5 Qe7 f4 d6 Qxd4 dxe5...,60,1,17,3,black,1548,1632,-84,1590.0
2,d4 d5 c4 Nf6 Nc3 c6 Bg5 e6 Nf3 Be7 e3 O-O c5 h...,60,1,12,2,black,1604,1918,-314,1761.0


moves_first30_san → fixed length: 60


Unnamed: 0,moves_first30_san,plies_processed,target
0,d4 Nf6 c4 e6 Nc3 c5 d5 d6 e3 e5 Nf3 Be7 Bd3 Nb...,60,black
1,e4 e5 Nf3 Nc6 d4 Nxd4 Nxe5 Qe7 f4 d6 Qxd4 dxe5...,60,black
2,d4 d5 c4 Nf6 Nc3 c6 Bg5 e6 Nf3 Be7 e3 O-O c5 h...,60,black
