# Análisis de historial de predicciones tácticas (`predicciones.csv`)

Este notebook explora el historial generado por el modelo de predicción de errores tácticos en `chess_trainer`.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

df = pd.read_csv("../data/predicciones.csv")
df["timestamp"] = pd.to_datetime(df["timestamp"])
df.head()

## Frecuencia de etiquetas tácticas predichas

In [None]:
sns.countplot(data=df, x="predicted_label", order=df["predicted_label"].value_counts().index)
plt.title("Distribución de etiquetas tácticas")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()

## Evolución temporal por etiqueta

In [None]:
etiquetas_por_fecha = df.groupby(df["timestamp"].dt.date)["predicted_label"].value_counts().unstack().fillna(0)
etiquetas_por_fecha.plot(kind="bar", stacked=True, figsize=(12, 6))
plt.title("Etiquetas tácticas por día")
plt.xlabel("Fecha")
plt.ylabel("Cantidad")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Boxplot de score_diff por etiqueta

In [None]:
sns.boxplot(data=df, x="predicted_label", y="score_diff")
plt.title("score_diff por tipo de error predicho")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()

## Histograma de branching_factor

In [None]:
sns.histplot(data=df, x="branching_factor", bins=20, kde=True)
plt.title("Distribución del branching_factor")
plt.tight_layout()
plt.show()

## Correlaciones numéricas

In [None]:
corr = df.corr(numeric_only=True)
sns.heatmap(corr, annot=True, cmap="coolwarm")
plt.title("Mapa de calor de correlaciones")
plt.tight_layout()
plt.show()

## Conclusiones

- Se observan más errores graves (score_diff negativos) en jugadas con menor branching_factor.
- Las etiquetas tácticas más frecuentes son las intermedias como 'Error' o 'Aceptable'.
- Hay estabilidad en la distribución temporal, aunque algunos días muestran concentración de errores impulsivos.


In [None]:
#!/bin/bash
kaggle datasets download ronakbadhe/chess-evaluations

In [None]:
!pip install chess

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers
import pandas as pd
import chess
import numpy as np

In [None]:
piece_to_index = {
    'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
    'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11
}

# Helper functions to convert FEN to tensor
def fen_to_tensor(fen):
    board = chess.Board(fen)
    board_tensor = np.zeros((13, 8, 8), dtype=np.float32)
    
    # Castling mapping
    castling_map = {'K': (7, 6), 'Q': (7, 2), 'k': (0, 6), 'q': (0, 2)}
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            row, col = 7 - chess.square_rank(square), chess.square_file(square)
            board_tensor[piece_to_index[piece.symbol()], row, col] = 1

    # FEN features
    fen_parts = fen.split()
    active_player = 1 if fen_parts[1] == 'w' else 0
    halfmove_clock = float(fen_parts[4]) / 100.0
    en_passant = fen_parts[3]
    castle_rights = fen_parts[2]
    
    # Encode en passant
    if en_passant != '-':
        row, col = 7 - (int(en_passant[1]) - 1), ord(en_passant[0]) - ord('a')
        board_tensor[12, row, col] = 1

    # Encode castling rights
    if castle_rights != '-':
        for right in castle_rights:
            row, col = castling_map[right]
            board_tensor[12, row, col] = 1

    return board_tensor, active_player, halfmove_clock

## Load dataset from CSV

In [None]:

def load_data(csv_path,sample_size=1000):
    data = pd.read_csv(csv_path)
    data = data.sample(n=sample_size, random_state=42)
    boards, active_players, halfmove_clocks, evaluations = [], [], [], []
    for idx, row in data.iterrows():
        board_tensor, active_player, halfmove_clock = fen_to_tensor(row['FEN'])
        boards.append(board_tensor)
        active_players.append(active_player)
        halfmove_clocks.append(halfmove_clock)
        evaluation=row['Evaluation']

        if evaluation.startswith('#'):
            # Converting checkmate to large positive/negative values
            if evaluation[1] == '-':
                # Negative checkmate (opponent checkmating)
                evaluation = -10000.0  # Arbitrary large negative value
            else:
                # Positive checkmate (current player checkmating)
                evaluation = 10000.0  # Arbitrary large positive value
        else:
            # Standard centipawn evaluation to float
            evaluation = float(evaluation)
        
        evaluations.append(evaluation)

    boards = np.array(boards)
    active_players = np.array(active_players)
    halfmove_clocks = np.array(halfmove_clocks)
    evaluations = np.array(evaluations)
    return boards, active_players, halfmove_clocks, evaluations


## Data loader + Label encoder

In [None]:
# Custom Conditional Batch Norm Layer
class ConditionalBatchNorm(layers.Layer):
    def __init__(self, num_features, num_conditions):
        super().__init__()
        self.num_features = num_features
        self.bn = layers.BatchNormalization(center=False, scale=False)
        self.gamma = layers.Embedding(num_conditions, num_features, embeddings_initializer='ones')
        self.beta = layers.Embedding(num_conditions, num_features, embeddings_initializer='zeros')

    def call(self, x, condition):
        normalized = self.bn(x)
        gamma = self.gamma(condition)[:, tf.newaxis, tf.newaxis, :]
        beta = self.beta(condition)[:, tf.newaxis, tf.newaxis, :]
        return gamma * normalized + beta

## Model architecture
**Conditional Batch Normalization**
Used to distinguish between black & white turns to play when training the model

In [None]:
# Model Architecture
class ChessEvaluationCNN(Model):
    def __init__(self, num_piece_channels=13, num_classes=1, num_conditions=2):
        super(ChessEvaluationCNN, self).__init__()
        
        # Convolutional layers
        self.conv1 = layers.Conv2D(64, kernel_size=3, padding='same')
        self.cbn1 = ConditionalBatchNorm(64, num_conditions)
        self.conv2 = layers.Conv2D(128, kernel_size=3, padding='same')
        self.cbn2 = ConditionalBatchNorm(128, num_conditions)
        self.conv3 = layers.Conv2D(256, kernel_size=3, padding='same')
        self.cbn3 = ConditionalBatchNorm(256, num_conditions)
        
        # Fully connected layers
        self.flatten = layers.Flatten()
        self.fc1 = layers.Dense(1024, activation='relu')
        self.fc2 = layers.Dense(num_classes)

    def call(self, inputs):
        board_tensor, active_player, halfmove_clock = inputs

        # Forward pass
        x = self.conv1(board_tensor)
        x = self.cbn1(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv2(x)
        x = self.cbn2(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv3(x)
        x = self.cbn3(x, active_player)
        x = tf.nn.relu(x)
        
        # Global average pooling
        x = tf.reduce_mean(x, axis=[1, 2])  # (batch_size, 256)
        
        # Fully connected layer with halfmove clock
        x = tf.concat([self.fc1(x), tf.expand_dims(halfmove_clock, -1)], axis=1)
        output = self.fc2(x)
        
        return output

## Original pure CNN architecture:
This version used a convultional network with a kernel size of 3 to learn the position's features. IN theory, useful for local features identification like pawn chains and structures, but can't make sense of long range relationships like threats, pins and attacks ..

In [None]:
# Model Architecture
class ChessEvaluationCNN(Model):
    def __init__(self, num_piece_channels=13, num_classes=1, num_conditions=2):
        super(ChessEvaluationCNN, self).__init__()
        
        # Convolutional layers
        self.conv1 = layers.Conv2D(64, kernel_size=3, padding='same')
        self.cbn1 = ConditionalBatchNorm(64, num_conditions)
        self.conv2 = layers.Conv2D(128, kernel_size=3, padding='same')
        self.cbn2 = ConditionalBatchNorm(128, num_conditions)
        self.conv3 = layers.Conv2D(256, kernel_size=3, padding='same')
        self.cbn3 = ConditionalBatchNorm(256, num_conditions)
        
        # Fully connected layers
        self.flatten = layers.Flatten()
        self.fc1 = layers.Dense(1024, activation='relu')
        self.fc2 = layers.Dense(num_classes)

    def call(self, inputs):
        board_tensor, active_player, halfmove_clock = inputs

        # Forward pass
        x = self.conv1(board_tensor)
        x = self.cbn1(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv2(x)
        x = self.cbn2(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv3(x)
        x = self.cbn3(x, active_player)
        x = tf.nn.relu(x)
        
        # Global average pooling
        x = tf.reduce_mean(x, axis=[1, 2])  # (batch_size, 256)
        
        # Fully connected layer with halfmove clock
        x = tf.concat([self.fc1(x), tf.expand_dims(halfmove_clock, -1)], axis=1)
        output = self.fc2(x)
        
        return output

## CNN + VIT
Using a hybrid architecture consisting of convolutional network + vision transformer with the added benefit of self attention, giving the model the ability to learn long range piece relationships, highly scalable.

In [None]:
# Helper function to create patches for ViT
def create_patches(x, patch_size):
    # Dynamically get batch size and input dimensions
    batch_size = tf.shape(x)[0]  # Dynamically fetch the actual batch size at runtime
    channels = x.shape[1]  # Channels are known statically (13)
    height = x.shape[2]     # Known statically (8)
    width = x.shape[3]      # Known statically (8)

    # Ensure the input is in the expected shape
    if height != 8 or width != 8:
        raise ValueError("Input dimensions for chessboard must be (None, 13, 8, 8)")

    # Reshape the input tensor to create patches
    patches = tf.image.extract_patches(
        images=tf.transpose(x, [0, 2, 3, 1]),  
        sizes=[1, patch_size, patch_size, 1],
        strides=[1, patch_size, patch_size, 1],
        rates=[1, 1, 1, 1],
        padding='VALID'
    )

    # Reshape the patches into (batch_size, num_patches, patch_dim)
    patch_dim = patch_size * patch_size * channels 
    num_patches = (height // patch_size) * (width // patch_size)
    
    # Use static shape where possible to avoid runtime errors during XLA compilation
    patches = tf.reshape(patches, [-1, num_patches, patch_dim])  # Use -1 for dynamic batch_size

    return patches

class ViTBlock(layers.Layer):
    def __init__(self, num_heads, embed_dim, ff_dim):
        super(ViTBlock, self).__init__()
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        attn_output = self.attention(inputs, inputs)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

# Hybrid model definition (CNN + ViT)
from keras.saving import register_keras_serializable

@register_keras_serializable(package="ChessModel")
class ChessEvaluationHybridModel(Model):
    def __init__(self, num_piece_channels=13, num_classes=1, num_conditions=2, patch_size=2):
        super(ChessEvaluationHybridModel, self).__init__()
        
        self.num_piece_channels=num_piece_channels
        self.num_classes=num_classes
        self.num_conditions=num_conditions
        self.patch_size=patch_size
        
        # CNN layers
        self.conv1 = layers.Conv2D(64, kernel_size=3, padding='same')
        self.cbn1 = ConditionalBatchNorm(64, num_conditions)
        self.conv2 = layers.Conv2D(128, kernel_size=3, padding='same')
        self.cbn2 = ConditionalBatchNorm(128, num_conditions)
        self.conv3 = layers.Conv2D(256, kernel_size=3, padding='same')
        self.cbn3 = ConditionalBatchNorm(256, num_conditions)
        
        # ViT layers
        self.patch_size = patch_size
        self.embedding_dim = (patch_size * patch_size) * num_piece_channels
        self.vit_proj = layers.Dense(self.embedding_dim)  # Project patches into embedding space
        
        self.vit_block1 = ViTBlock(num_heads=4, embed_dim=self.embedding_dim, ff_dim=512)
        self.vit_block2 = ViTBlock(num_heads=4, embed_dim=self.embedding_dim, ff_dim=512)
        
        self.flatten = layers.Flatten()
        
        # Fully connected layers
        self.fc1 = layers.Dense(1024, activation='relu')
        self.fc2 = layers.Dense(num_classes)

    def call(self, inputs):
        board_tensor, active_player, halfmove_clock = inputs

        # CNN forward pass
        x = self.conv1(board_tensor)
        x = self.cbn1(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv2(x)
        x = self.cbn2(x, active_player)
        x = tf.nn.relu(x)

        x = self.conv3(x)
        x = self.cbn3(x, active_player)
        x = tf.nn.relu(x)
        
        # ViT forward pass
        patches = create_patches(board_tensor, self.patch_size)
        patches = self.vit_proj(patches)
        vit_out = self.vit_block1(patches)
        vit_out = self.vit_block2(vit_out)
        vit_out = tf.reduce_mean(vit_out, axis=1)  # Global average pooling for patches

        # Combine CNN and ViT outputs
        x = tf.concat([self.flatten(x), vit_out], axis=1)
        
        # Fully connected layers with halfmove clock
        x = tf.concat([self.fc1(x), tf.expand_dims(halfmove_clock, -1)], axis=1)
        output = self.fc2(x)
        
        return output
    @classmethod
    def from_config(cls, config):
        # Manually pass in the parameters here
        return cls(
            num_piece_channels=config['num_piece_channels'],
            num_classes=config['num_classes'],
            num_conditions=config['num_conditions'],
            patch_size=config['patch_size']
        )

    def get_config(self):
        config = super().get_config()
        # Include the custom arguments in the config dictionary
        config.update({
            'num_piece_channels': self.num_piece_channels,
            'num_classes': self.num_classes,
            'num_conditions': self.num_conditions,
            'patch_size': self.patch_size
        })
        return config

In [None]:
#Loading previous version output using kaggle api 
!rm -rf /kaggle/working/*

#Replace with your own kaggle api as kaggle secrets input when copying this noteebook
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
Api_key = user_secrets.get_secret("Kaggle Api")

import os
os.environ['KAGGLE_USERNAME'] = 'oussamahaboubi'
os.environ['KAGGLE_KEY'] = Api_key


def get_model():
    saved_model=None
    home_path="/kaggle/working/"
    for file in os.listdir(home_path):
        if ".keras" in file:
            saved_model=home_path+file
    return saved_model

saved_model=get_model()
if not saved_model:   
    !kaggle kernels output oussamahaboubi/chess-evaluation-cnn-tensorflow -p /kaggle/working/
    saved_model=get_model()

# from tensorflow.keras.models import load_model
# print(saved_model)

# saved_model='/kaggle/working/checkpoint211120241650.model.keras'
# model = ChessEvaluationHybridModel()
# model = tf.keras.models.load_model(saved_model,custom_objects={'ChessEvaluationHybridModel': ChessEvaluationHybridModel})

# Create model with manual config
# model = ChessEvaluationHybridModel()
# model=tf.keras.models.load_model("/kaggle/working/21-11-2024 19:51.keras")

# Now load the weights into the new model
# model.load_weights(saved_model)

## Model Training

In [None]:
class LossHistory(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.losses = []

    def on_epoch_end(self, epoch, logs=None):
        # Append the loss at the end of each epoch
        self.losses.append(logs.get('loss'))

In [None]:
# Prepare TensorFlow dataset
SAMPLE_SIZE=1000000
EPOCHS=200
BATCH_SIZE=512
boards, active_players, halfmove_clocks, evaluations = load_data('/kaggle/input/chess-evaluations/random_evals.csv',sample_size=SAMPLE_SIZE)

inputs = (boards, active_players, halfmove_clocks)
targets = evaluations
dataset = tf.data.Dataset.from_tensor_slices((inputs, targets))
dataset = dataset.shuffle(buffer_size=2048).batch(BATCH_SIZE)

loss_history = LossHistory()

In [None]:
from datetime import datetime
import tensorflow.keras as keras
from tensorflow.keras.callbacks import ModelCheckpoint


checkpoint_filepath = "model.keras"#f'checkpoint{now}.model.keras'
model_checkpoint_callback = keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='loss',
    mode='min',
    save_best_only=True)


lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.005,
    decay_steps=20000,
    decay_rate=0.9)

optimizer=optimizers.Adam(learning_rate=lr_schedule)


# Compile and train the model
model = ChessEvaluationHybridModel()

model.compile(optimizer=optimizer, loss='mse')

model.load_weights(saved_model)


model.fit(dataset,epochs=EPOCHS, callbacks=[loss_history,model_checkpoint_callback])


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Create a DataFrame
df = pd.DataFrame({'Epoch': range(1, EPOCHS + 1), 'Loss': loss_history.losses})

# Plot the loss curve
plt.plot(df['Epoch'], df['Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss (Logarithmic Scale)')
plt.yscale('log')
plt.grid(True)
plt.show()

In [None]:
from datetime import datetime
now = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
model.save(f'/kaggle/working/{now}.keras')

## Testing Saved Models
Latest Checkpoint
using the latest checkpoint generated with model.Save, which ended at around 24k loss value and rising, reassigning the optimizer starts from a higher loss ,around 230K and decreasing quickly. But not as bad as training the model from the ground up.

These losses sugget possible issues like local minima, overfitting, etc.

In [None]:
model2=tf.keras.models.load_model("/kaggle/working/22-11-2024 00:41:15.keras", custom_objects={"ChessEvaluationHybridModel":ChessEvaluationHybridModel})

lr_schedule2 = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.0001,
    decay_steps=20000,
    decay_rate=0.9)

optimizer2=optimizers.Adam(learning_rate=lr_schedule)

model2.compile(optimizer=optimizer2, loss='mse')
model2.fit(dataset, epochs=10)

In [None]:
model3=tf.keras.models.load_model("/kaggle/working/22-11-2024 00:41:14.keras", custom_objects={"ChessEvaluationHybridModel":ChessEvaluationHybridModel})

model3.fit(dataset, epochs=10)

## Best checkpoint
Using the callback checkpoint allows us to continue training from the lowest loss point of the model, However this still runs into the issue of increasing loss

In [None]:
model4=tf.keras.models.load_model("/kaggle/working/model.keras", custom_objects={"ChessEvaluationHybridModel":ChessEvaluationHybridModel})

model4.fit(dataset, epochs=10)

In [None]:
model5=tf.keras.models.load_model("/kaggle/working/model.keras", custom_objects={"ChessEvaluationHybridModel":ChessEvaluationHybridModel})

lr_schedule3 = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.0001,
    decay_steps=20000,
    decay_rate=0.9)

optimizer3=optimizers.Adam(learning_rate=lr_schedule)

model5.compile(optimizer=optimizer3, loss='mse')
model5.fit(dataset, epochs=5)

# Modelo Supervisado para Predecir error_label
Este notebook entrena un modelo de clasificación para predecir el tipo de error cometido en una jugada de ajedrez usando los features generados en `training_dataset.csv`.

In [None]:
import pandas as pd

# Cargar el dataset
df = pd.read_csv("training_dataset.csv")

# Inspección inicial
print(df.shape)
print(df.columns)
print(df['error_label'].value_counts())
df.head()


In [None]:
from sklearn.model_selection import train_test_split

# Selección de features
features = [
    'score_diff', 'material_total', 'material_balance', 'num_pieces',
    'branching_factor', 'self_mobility', 'opponent_mobility',
    'phase', 'has_castling_rights', 'is_low_mobility', 
    'is_center_controlled', 'is_pawn_endgame'
]

X = df[features]
y = df['error_label']

# División de datos
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, confusion_matrix

# Entrenamiento
clf = DecisionTreeClassifier(max_depth=5, random_state=42)
clf.fit(X_train, y_train)

# Evaluación
y_pred = clf.predict(X_test)
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))


In [None]:
import joblib

# Guardar el modelo entrenado
joblib.dump(clf, 'trained_error_label_model.pkl')


## Comparación con otros modelos (opcional)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

models = {
    "Decision Tree": DecisionTreeClassifier(max_depth=5),
    "Random Forest": RandomForestClassifier(n_estimators=100),
    "Logistic Regression": LogisticRegression(max_iter=1000)
}

for name, model in models.items():
    scores = cross_val_score(model, X, y, cv=5)
    print(f"{name} accuracy: {scores.mean():.4f}")


# PCA + Clustering en Partidas de Ajedrez
Este notebook aplica PCA a posiciones vectorizadas y agrupa los errores en clusters.

In [None]:
import sys
import os
import numpy as np
# Ensure the 'src' directory exists and is correctly added to the Python path
src_path = os.path.abspath('src')
if os.path.exists(src_path) and src_path not in sys.path:
    sys.path.append(src_path)
from src.extractor import extract_features_from_fen
from src.reducer import apply_pca
from src.cluster import cluster_points
from src.utils import load_pgn_positions
import matplotlib.pyplot as plt

# Ensure the 'src' directory is in the Python path
sys.path.append(os.path.abspath('src'))

ModuleNotFoundError: No module named 'src'

In [None]:
# Cargar posiciones
fens = load_pgn_positions('data/game.png')
X = np.array([extract_features_from_fen(f) for f in fens])

In [None]:
# Aplicar PCA
pca, Z = apply_pca(X, n_components=2)

In [None]:
# Clustering
labels, model = cluster_points(Z)

In [None]:
# Visualización
plt.figure(figsize=(8, 6))
plt.scatter(Z[:, 0], Z[:, 1], c=labels, cmap='viridis', s=30)
plt.title('Clusters de posiciones en PCA')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.grid(True)
plt.show()

In [None]:
import os
import chess
import chess.engine
import dotenv
env = dotenv.load_dotenv()

STOCKFISH_PATH = os.environ.get("STOCKFISH_PATH","/usr/local/bin’")

with chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH) as engine:
    board = chess.Board("rnbqkb1r/pppp2pp/5n2/4pP2/8/5N2/PPPP1PPP/RNBQKB1R w KQkq - 1 4")
    info = engine.analyse(board, chess.engine.Limit(depth=10), multipv=3)
    print(type(info))  # → <class 'list'>
    print(info[0]["score"], info[1]["score"], info[2]["score"])

<class 'list'>
PovScore(Cp(+118), WHITE) PovScore(Cp(+105), WHITE) PovScore(Cp(+91), WHITE)
