In [None]:
import chess
import chess.pgn
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
from utils import fen_to_binary_features

In [None]:
def count_games_in_pgn(file_path):
    with open(file_path, "r") as file:
        content = file.read()
    return content.count("[Event")

In [None]:
def pgn_to_dataframe(pgn_file_path):
    """
    Parse a PGN file and create a DataFrame where each row represents a position.
    Columns in the DataFrame:
        - game_id
        - move_number
        - board_fen
        - move
        - result (e.g. '1-0', '0-1', '1/2-1/2')
    """
    all_positions = []
    game_id = 0

    with open(pgn_file_path, "r", encoding="utf-8") as pgn:
        for i in tqdm(range(count_games_in_pgn(pgn_file_path))):
            game = chess.pgn.read_game(pgn)
            if game is None:
                break

            game_id += 1
            result = game.headers.get("Result", "*")
            board = game.board()

            move_number = 0
            for move in game.mainline_moves():
                board.push(move)
                move_number += 1
                position_data = {
                    "game_id": game_id,
                    "move_number": move_number,
                    "board_fen": board.fen(),
                    "move": move.uci(),
                    "result": result,
                }
                all_positions.append(position_data)

    df = pd.DataFrame(all_positions)
    return df

In [None]:
# pgn_path = r"C:\Users\forbe\Downloads\lichess_elite_2024-10\lichess_elite_2024-10.pgn"
# df_positions = pgn_to_dataframe(pgn_path)
# df_positions.to_parquet(r"C:\Users\forbe\OneDrive\Personal\Documents\repos\chess_data\df_positions.parquet")

In [None]:
# df_positions = pd.read_parquet(r"C:\Users\forbe\OneDrive\Personal\Documents\repos\chess_data\df_positions.parquet")

In [None]:
RESULT_MAPPING = {"1-0": 2, "0-1": 0, "1/2-1/2": 1}


def split_dataframe(df, chunk_size):
    return [df[i : i + chunk_size] for i in range(0, len(df), chunk_size)]


def df_fen_to_binary_perspective(
    df: pd.DataFrame, fen_column: str = "board_fen", result_column: str = "result"
) -> pd.DataFrame:
    """
    Given a DataFrame that has columns:
      - fen_column (default 'board_fen'): FEN strings
      - result_column (default 'result'): game result
    Convert each FEN to a 780-dim feature vector from the side-to-move perspective.

    Adjust the result to reflect the perspective of the side to move.

    Returns a new DataFrame with:
      - 768 columns for piece placement
      - 4 columns for castling rights
      - 8 columns for en passant
      - 'result' column (adjusted to the side to move)
    """
    feature_list = []
    adjusted_results = []

    for fen, result in zip(df[fen_column], df[result_column]):
        # Extract features from the perspective of the side to move
        board = chess.Board(fen)
        side_to_move = board.turn  # True for White, False for Black

        if not side_to_move:  # Black to move
            # Adjust the result: swap "1-0" with "0-1"
            if result == "1-0":
                adjusted_result = "0-1"
            elif result == "0-1":
                adjusted_result = "1-0"
            else:
                adjusted_result = result  # Draw remains the same
        else:
            adjusted_result = result  # White to move, no adjustment needed

        # Convert FEN to binary features
        feature_vec = fen_to_binary_features(board.fen())
        feature_list.append(feature_vec)
        adjusted_results.append(adjusted_result)

    # Convert the list of numpy arrays to a 2D array
    feature_array = np.vstack(feature_list)

    # Create column names
    column_names = (
        [f"piece_placement_{i}" for i in range(768)]
        + [f"castling_{i}" for i in range(4)]
        + [f"en_passant_{i}" for i in range(8)]
    )

    # Build the DataFrame of features
    df_features = pd.DataFrame(feature_array, columns=column_names, index=df.index)

    # Add the adjusted result column, mapping it to numeric labels
    df_features[result_column] = adjusted_results
    df_features[result_column] = df_features[result_column].map(RESULT_MAPPING)

    return df_features

In [None]:
# df_fen_to_binary_perspective(df_positions.iloc[:100]).diff()

In [None]:
# top_positions = list(df_positions[df_positions.move_number == 4].board_fen.value_counts().index)

In [None]:
# chess.Board(top_positions[2])

In [None]:
# df_features = pd.concat([df_fen_to_binary_perspective(sub_df) for sub_df in tqdm(split_dataframe(df_positions.sample(frac=1, random_state=42).reset_index(drop=True), 100_000)[:10])])

In [None]:
# df_features.shape

In [None]:
# df_features.to_parquet(r"C:\Users\forbe\OneDrive\Personal\Documents\repos\chess_data\df_features.parquet")

In [None]:
def train_mlp_classifier(X, Y):
    # Split train/test
    X_train, X_test, y_train, y_test = train_test_split(
        X,
        Y,
        test_size=0.20,
        random_state=42,
    )

    # Optional: scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Initialize MLP
    mlp = MLPClassifier(
        hidden_layer_sizes=(16), random_state=42, verbose=True, early_stopping=True
    )

    print("ready to train")

    # Fit (train) the model
    mlp.fit(X_train_scaled, y_train)

    # Evaluate
    accuracy = mlp.score(X_test_scaled, y_test)
    print(f"Test Accuracy: {accuracy:.3f}")

    y_pred = mlp.predict(X_test_scaled)
    print("Classification Report:")
    print(classification_report(y_test, y_pred, digits=3))

    # Return the trained classifier and scaler
    return mlp, scaler

In [None]:
from sklearn.linear_model import LogisticRegression

In [None]:
df_features = pd.read_parquet(
    r"C:\Users\forbe\OneDrive\Personal\Documents\repos\chess_data\df_features.parquet"
)

In [None]:
df_features

In [None]:
# pca = PCA(n_components=600)  # Number of components you want
# pca_features = pca.fit_transform(df_features[list(df_features.columns)[:-1]].values)

In [None]:
piece_categories = [
    "player_pawns",
    "player_knights",
    "player_bishops",
    "player_rooks",
    "player_queens",
    "player_kings",
    "opponent_pawns",
    "opponent_knights",
    "opponent_bishops",
    "opponent_rooks",
    "opponent_queens",
    "opponent_kings",
]

In [None]:
# for i, piece in enumerate(piece_categories):
#     df_features[piece] =  np.sum(df_features.iloc[:,list(np.arange(64)*12 + i)], axis=1)#np.sum(df_features[df_features.columns[i*64:i*64+64]], axis=1)

In [None]:
piece_feature_lookup = {}

for i, piece in enumerate(piece_categories):
    piece_feature_lookup[piece] = df_features.iloc[
        :, list(np.arange(64) * 12 + i)
    ].values

In [None]:
total_pieces = np.sum(df_features[df_features.columns[:728]], axis=1)

In [None]:
total_pieces = np.sum(df_features[df_features.columns[:728]], axis=1)

In [None]:
# X = df_features[df_features.columns[:-1]].values #
X = piece_feature_lookup["player_kings"][total_pieces < 10]
Y = df_features["result"][total_pieces < 10]

In [None]:
logistic_model = LogisticRegression().fit(X, Y)

In [None]:
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt

cmap = plt.cm.RdYlGn
plot_array = logistic_model.coef_[2].reshape(8, 8)[::-1]
norm = mcolors.Normalize(vmin=np.min(plot_array), vmax=np.max(plot_array))

plt.figure(figsize=(8, 8))
plt.imshow(plot_array, cmap=cmap, norm=norm)
plt.show()

In [None]:
logistic_scaler = StandardScaler()
logistic_model = LogisticRegression().fit(logistic_scaler.fit_transform(X), Y)

In [None]:
logistic_model.coef_[0]

In [None]:
logistic_model.predict_proba(logistic_scaler.transform(X[5000].reshape(1, -1)))

In [None]:
# with open("pkl/logistic_model.pkl", "wb") as f:
#     pickle.dump(logistic_model, f)

# with open("pkl/logistic_scaler.pkl", "wb") as f:
#     pickle.dump(logistic_scaler, f)

In [None]:
# mlp_model, feature_scaler = train_mlp_classifier(X, Y)

In [None]:
# with open("pkl/mlp_model.pkl", "wb") as f:
#     pickle.dump(mlp_model, f)

# with open("pkl/mlp_scaler.pkl", "wb") as f:
#     pickle.dump(feature_scaler, f)