In [1]:
import numpy as np
from tensorflow.keras import layers, Model, Sequential, backend, optimizers, config
from tensorflow.keras.layers import Input, LeakyReLU
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.metrics import accuracy_score
import pickle
import gc
import os
import json
from typing import List, Tuple, Dict

config.enable_unsafe_deserialization()

In [2]:
random_state = 93
lr = 0.001

In [3]:
def create_base_cnn(n_pieces):
    # Use Input layer to define the input shape
    inputs = Input(shape=(n_pieces,8, 8))  
    x = layers.Conv2D(32, (3, 3), padding='same')(inputs)
    x = LeakyReLU(negative_slope=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, (3, 3), padding='same')(x)
    x = LeakyReLU(negative_slope=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, (3, 3), padding='same')(x)
    x = LeakyReLU(negative_slope=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(256)(x)
    x = LeakyReLU(negative_slope=0.1)(x)
    model = Model(inputs=inputs, outputs=x)

    return model 

In [4]:
def create_single_input_model(n_pieces):
    inputs = Input(shape=(n_pieces, 8, 8))
    base_cnn = create_base_cnn(n_pieces) 
    x = base_cnn(inputs) 
    x = layers.Dense(64)(x)
    x = LeakyReLU(negative_slope=0.1)(x)
    x = layers.Dense(1, activation='tanh')(x) # Output between -1 and 1

    model = Model(inputs=inputs, outputs=x)
    model.compile(optimizer=optimizers.Adam(learning_rate=lr), loss='mse', metrics=['mse'])
    return model

In [5]:
def create_pairwise_model(version, n_pieces):
    base_cnn = create_base_cnn(n_pieces) 
    input_a = Input(shape=(n_pieces,8, 8)) 
    input_b = Input(shape=(n_pieces,8, 8))

    encoded_a = base_cnn(input_a)
    encoded_b = base_cnn(input_b)

    diff = layers.Subtract()([encoded_a, encoded_b])
    mult = layers.Multiply()([encoded_a, encoded_b])
    merged = layers.Concatenate()([diff, mult])

    x = layers.Dense(128)(merged)
    x = LeakyReLU(negative_slope=0.1)(x)
    x = layers.Dense(32)(x)
    x = LeakyReLU(negative_slope=0.1)(x)

    if version == "Regression":
        output = layers.Dense(1, activation='tanh')(x) #Output between -1 and 1 
        loss = 'mse'
        metrics = ['mse']
    elif version == "Classification":
        output = layers.Dense(3, activation='softmax')(x)
        loss = SparseCategoricalCrossentropy()
        metrics = ['accuracy']
    else:
        raise ValueError(f"Unknown version: {version}")

    model = Model(inputs=[input_a, input_b], outputs=output)
    model.compile(optimizer=optimizers.Adam(learning_rate=lr), loss=loss, metrics=metrics)
    return model

In [6]:
def find_best_threshold_v1(X1, X2, y_true):
    # Finds the best threshold by maximizing accuracy
    best_threshold, best_score = None, 0

    thresholds = np.linspace(0, 0.05, 200)

    for threshold in thresholds:
        y_pred = np.where(np.abs(X1 - X2) < threshold, 0, np.where(X1 > X2, 1, 2))
        score = accuracy_score(y_true, y_pred)
        if score > best_score:
            best_score = score
            best_threshold = threshold

    return best_threshold, best_score

In [7]:
def find_best_thresholds_v2(predictions, labels):
    # Finds the best threshold1 and threshold2 using a grid search by maximizing accuracy
    thresholds1 = np.linspace(-0.8, 0, 200)
    thresholds2 = np.linspace(0, 0.8, 200)

    best_t1, best_t2, best_acc = None, None, 0

    # Vectorized grid search
    T1, T2 = np.meshgrid(thresholds1, thresholds2)
    valid = T1 < T2
    T1, T2 = T1[valid], T2[valid]

    for t1, t2 in zip(T1, T2):
        predicted_labels = np.where(predictions > t2, 2, np.where(predictions < t1, 1, 0))
        acc = accuracy_score(labels, predicted_labels)
        if acc > best_acc:
            best_t1, best_t2, best_acc = t1, t2, acc

    return best_t1, best_t2, best_acc

In [8]:
def create_post_nn(n):
    model = Sequential([
        layers.Input(shape=(n,)),
        layers.Dense(8),
        layers.LeakyReLU(negative_slope=0.1),
        layers.Dense(3, activation='softmax')
        ])

    model.compile(
        optimizer=optimizers.Adam(learning_rate=lr),
        loss=SparseCategoricalCrossentropy(),  
        metrics=['accuracy']
    )

    return model

**FOR ALL MODELS**

In [9]:
batch_sizes = [int(32*(2**i)) for i in range(5)]
print(batch_sizes)

[32, 64, 128, 256, 512]


In [10]:
def training(model, X, y, epochs_per_batch=5, acc=False):
    hist = {'loss': [],'val_loss': []}  
    if acc:
        hist['accuracy'] = []
        hist['val_accuracy'] = []
         
    # EarlyStopping callback
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5, 
        restore_best_weights=True,
        verbose=1
    )

    for n, batch_size in enumerate(batch_sizes):
        print(f"Training with batch size: {batch_size}, epochs {n*epochs_per_batch+1} to {(n+1)*epochs_per_batch}")
        # Train your model with the current batch size
        epoch_history = model.fit(
            X, y,
            batch_size=batch_size,
            epochs=(n+1)*epochs_per_batch,
            initial_epoch=n*epochs_per_batch,  
            validation_split=0.15,
            verbose=1,
            callbacks = [early_stopping]
        )

        # Append the results to the history dictionary
        hist['loss'].extend(epoch_history.history['loss'])
        hist['val_loss'].extend(epoch_history.history['val_loss'])
        if acc:
            hist['accuracy'].extend(epoch_history.history['accuracy'])
            hist['val_accuracy'].extend(epoch_history.history['val_accuracy'])
        if model.stop_training:
            print("Early stopping triggered.")
            break
        
    return model, hist

In [11]:
def load_endgame_data(endgame: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Load position and pair data for an endgame from .npz files.

    Args:
        endgame (str): Name of the endgame (e.g., 'KRK').

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
            - positions: Position data (shape: (N, channels, 8, 8)).
            - dtm_values: DTM values (shape: (N,)).
            - positions1: First positions for pairs (shape: (M, channels, 8, 8)).
            - positions2: Second positions for pairs (shape: (M, channels, 8, 8)).
            - labels: Pair labels (shape: (M,), values in {0, 1, 2}).
    """
    
    data = np.load(fr"DataBase\{endgame}_position_train_val.npz")
    positions, dtm_values = data["positions"], data["dtm_values"]
    print(f"Loaded {endgame} position data with {dtm_values.shape[0]} samples")

    pair_data = np.load(fr"DataBase\{endgame}_pairs_train_val.npz")
    positions1, positions2, labels = pair_data["positions1"], pair_data["positions2"], pair_data["labels"]
    print(f"Loaded {endgame} pair data with {labels.shape[0]} samples")

    return positions, dtm_values, positions1, positions2, labels

def train_model(model, inputs, labels: np.ndarray, model_name: str, endgame: str, track_accuracy: bool, force_train: bool = False):
    """
    Train a model and save its weights and history.

    Args:
        model: Keras model to train.
        inputs: Input data (single array or tuple of two arrays for pairwise models).
        labels: Target labels.
        model_name: Name of the model (e.g., 'single', 'regression').
        endgame: Name of the endgame (e.g., 'KRK').
        track_accuracy: Whether to track accuracy (for classification models).

    Returns:
        Trained model and history.

    """    
    model_path = fr"Models\{endgame}_model_{model_name}.keras"
    if not force_train and os.path.exists(model_path):
        print(f"Loading existing {model_name} model for {endgame} from {model_path}.")
        model = load_model(model_path)
        return model
    model, history = training(model, inputs, labels, acc=track_accuracy)

    model.save(model_path)
    history_path = fr"Histories\{endgame}_history_{model_name}.pkl"
    with open(history_path, "wb") as f:
        pickle.dump(history, f)
    print(f"Saved model to {model_path} and history to {history_path}")

    return model

def post_process_single(model, positions1: np.ndarray, positions2: np.ndarray, labels: np.ndarray, max_samples: int, endgame: str, model_name: str, post_nn_epochs: int, post_nn_batch_size: int) -> Dict[str, float]:
    """
    Apply post-processing for single-input model (thresholding and neural network).

    Args:
        model: Trained single-input model.
        positions1: First positions (shape: (M, channels, 8, 8)).
        positions2: Second positions (shape: (M, channels, 8, 8)).
        labels: True labels (shape: (M,), values in {0, 1, 2}).
        max_samples: Maximum number of samples for post-processing.
        model_dir: Directory to save the post-processing model.
        endgame: Name of the endgame.
        model_name: Name of the model.
        post_nn_epochs: Epochs for post-processing neural network.
        post_nn_batch_size: Batch size for post-processing neural network.

    Returns:
        Dict[str, float]: Threshold configuration (threshold1, threshold2).

    """
    sample_indices = np.arange(min(max_samples, labels.shape[0]))
    y1_pred = model.predict(positions1[sample_indices], batch_size=32)
    y2_pred = model.predict(positions2[sample_indices], batch_size=32)
    y_pred_comb = np.hstack([y1_pred, y2_pred])

    # Threshold method
    best_threshold, best_accuracy = find_best_threshold_v1(y1_pred, y2_pred, labels[sample_indices])
    print(f"Single model thresholds: threshold={best_threshold:.4f}, Accuracy={best_accuracy:.4f}")
    threshold_config = {"threshold1": float(best_threshold), "threshold2": 0.0}

    # Post-processing neural network
    post_model = create_post_nn(2)
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1)
    post_model.fit(y_pred_comb, labels[sample_indices], epochs=post_nn_epochs,
                  batch_size=post_nn_batch_size, validation_split=0.15, callbacks=[early_stopping])
    post_model.save(fr"Models\{endgame}_model_pro_{model_name}.keras")

    return threshold_config

def post_process_regression(model, positions1: np.ndarray, positions2: np.ndarray, labels: np.ndarray, max_samples: int, endgame: str, model_name: str, post_nn_epochs: int, post_nn_batch_size: int) -> Dict[str, float]:
    """
    Apply post-processing for regression model (thresholding and neural network).

    Args:
        model: Trained regression model.
        positions1: First positions (shape: (M, channels, 8, 8)).
        positions2: Second positions (shape: (M, channels, 8, 8)).
        labels: True labels (shape: (M,), values in {0, 1, 2}).
        max_samples: Maximum number of samples for post-processing.
        model_dir: Directory to save the post-processing model.
        endgame: Name of the endgame.
        model_name: Name of the model.
        post_nn_epochs: Epochs for post-processing neural network.
        post_nn_batch_size: Batch size for post-processing neural network.

    Returns:
        Dict[str, float]: Threshold configuration (threshold1, threshold2).
    """
    sample_indices = np.arange(min(max_samples, labels.shape[0]))
    y_pred = model.predict([positions1[sample_indices], positions2[sample_indices]], batch_size=32)

    # Threshold method
    best_t1, best_t2, best_acc = find_best_thresholds_v2(y_pred, labels[sample_indices])
    print(f"Regression model thresholds: t1={best_t1:.4f}, t2={best_t2:.4f}, Accuracy={best_acc:.4f}")
    threshold_config = {"threshold1": float(best_t1), "threshold2": float(best_t2)}

    # Post-processing neural network
    post_model = create_post_nn(1)
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1)
    post_model.fit(y_pred, labels[sample_indices], epochs=post_nn_epochs,
                  batch_size=post_nn_batch_size, validation_split=0.15, callbacks=[early_stopping])
    post_model.save(fr"Models\{endgame}_model_pro_{model_name}.keras")

    return threshold_config

def save_threshold_config(threshold_config: Dict) -> None:
    """
    Save threshold configuration to JSON file, preserving existing entries.

    Args:
        threshold_config (Dict): Threshold configuration dictionary.
        config_dir (str): Directory to save thresholds.json.

    Raises:
        OSError: If file saving fails.
    """
    threshold_config_path = r"Configs\thresholds.json"
    existing_config = {}
    if os.path.exists(threshold_config_path):
        with open(threshold_config_path, "r") as f:
            existing_config = json.load(f)
        
    existing_config.update(threshold_config)
    with open(threshold_config_path, "w") as f:
        json.dump(existing_config, f, indent=4)
    print(f"Updated threshold config saved to {threshold_config_path}")
    
def train_endgame_models(endgames: List[str], model_types: List, max_samples: int = 100_000, post_nn_epochs: int = 5, post_nn_batch_size: int = 32, force_train: bool = False) -> Dict:
    """
    Train models for multiple chess endgames, apply post-processing, and save results.

    Args:
        endgames (List[str]): List of endgame names (e.g., ['KRK', 'KPKR']).
        model_types (List[Tuple[str, Callable]]): List of (model_name, create_function) pairs.
        data_dir (str): Directory containing .npz data files (default: 'DataBase').
        model_dir (str): Directory to save trained models (default: 'Models').
        history_dir (str): Directory to save training histories (default: 'Histories').
        config_dir (str): Directory to save threshold configurations (default: 'Configs').
        max_samples (int): Maximum number of samples for post-processing (default: 100,000).
        post_nn_epochs (int): Epochs for post-processing neural network (default: 10).
        post_nn_batch_size (int): Batch size for post-processing neural network (default: 32).

    Returns:
        Dict: Threshold configurations for each endgame and model.
    """
    threshold_config = {}

    for endgame in endgames:
        # Load data
        positions, dtm_values, positions1, positions2, labels = load_endgame_data(endgame)

        for model_name, create_func in model_types:

            print(f"\nTraining {model_name} model for {endgame}")
            model = create_func(positions.shape[1])
            # Transform labels for regression model
            if model_name == "regression":
                # Map: 0=equal, 1=pos1 better, 2=pos2 better to 0=equal, -1=pos1 better, 1=pos2 better
                labels_trans = np.where(labels == 1, -1, np.where(labels == 2, 1, 0))
            elif model_name == "single":
                labels_trans = dtm_values
            else:
                labels_trans = labels

            # Train model
            inputs = positions if model_name == "single" else [positions1, positions2]
            model = train_model(model, inputs, labels_trans, model_name, endgame,track_accuracy = model_name == "classification",force_train=force_train)

            # Post-processing
            if model_name == "single":
                thresholds = post_process_single(model, positions1, positions2, labels, max_samples,
                                               endgame, model_name, post_nn_epochs, post_nn_batch_size)
            elif model_name == "regression":
                thresholds = post_process_regression(model, positions1, positions2, labels, max_samples,
                                                   endgame, model_name, post_nn_epochs, post_nn_batch_size)
            else:
                thresholds = {"threshold1": 0.0, "threshold2": 0.0}  # No thresholding for classification

            threshold_config.setdefault(endgame, {})[model_name] = thresholds
            save_threshold_config(threshold_config)

            backend.clear_session()
            gc.collect()

    return threshold_config

In [12]:
endgames = ['KRK', 'KQKR', 'KRKP', 'KPK','KPKP', 'KNKP']
model_types = [
    ("single", lambda n: create_single_input_model(n)),
    ("regression", lambda n: create_pairwise_model("Regression", n)),
    ("classification", lambda n: create_pairwise_model("Classification", n)),
]
train_endgame_models(endgames, model_types, force_train=True)

Loaded KRK position data with 50500 samples
Loaded KRK pair data with 450000 samples

Training single model for KRK
Training with batch size: 32, epochs 1 to 5
Epoch 1/5
[1m1342/1342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 4ms/step - loss: 0.0179 - mse: 0.0179 - val_loss: 0.0023 - val_mse: 0.0023
Epoch 2/5
[1m1342/1342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - loss: 0.0022 - mse: 0.0022 - val_loss: 0.0018 - val_mse: 0.0018
Epoch 3/5
[1m1342/1342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - loss: 0.0015 - mse: 0.0015 - val_loss: 0.0012 - val_mse: 0.0012
Epoch 4/5
[1m1342/1342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - loss: 0.0013 - mse: 0.0013 - val_loss: 0.0018 - val_mse: 0.0018
Epoch 5/5
[1m1342/1342[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 5ms/step - loss: 0.0013 - mse: 0.0013 - val_loss: 0.0012 - val_mse: 0.0012
Restoring model weights from the end of the best epoch: 3.
Training with ba

{'KRK': {'single': {'threshold1': 0.0005025125628140704, 'threshold2': 0.0},
  'regression': {'threshold1': -0.5829145728643217,
   'threshold2': 0.46633165829145734},
  'classification': {'threshold1': 0.0, 'threshold2': 0.0}},
 'KQKR': {'single': {'threshold1': 0.0020100502512562816, 'threshold2': 0.0},
  'regression': {'threshold1': -0.16884422110552766,
   'threshold2': 0.3216080402010051},
  'classification': {'threshold1': 0.0, 'threshold2': 0.0}},
 'KRKP': {'single': {'threshold1': 0.0, 'threshold2': 0.0},
  'regression': {'threshold1': -0.31758793969849247,
   'threshold2': 0.36180904522613067},
  'classification': {'threshold1': 0.0, 'threshold2': 0.0}},
 'KPK': {'single': {'threshold1': 0.004271356783919598, 'threshold2': 0.0},
  'regression': {'threshold1': -0.4984924623115578,
   'threshold2': 0.43015075376884426},
  'classification': {'threshold1': 0.0, 'threshold2': 0.0}},
 'KPKP': {'single': {'threshold1': 0.00628140703517588, 'threshold2': 0.0},
  'regression': {'thresh