In [None]:
!pip install chess
!pip install torch
!pip install numpy
!pip install scikit-learn
!pip install pandas
!pip install torchviz

Collecting torchviz
  Downloading torchviz-0.0.3-py3-none-any.whl.metadata (2.1 kB)
Downloading torchviz-0.0.3-py3-none-any.whl (5.7 kB)
Installing collected packages: torchviz
Successfully installed torchviz-0.0.3


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import chess
# Utility: convert FEN to tensor with positional features
def fen_to_tensor(fen):
    board = chess.Board(fen)
    # Base piece planes: 12 channels
    planes = np.zeros((12, 8, 8), dtype=np.float32)
    for sq, piece in board.piece_map().items():
        idx = {'P':0,'N':1,'B':2,'R':3,'Q':4,'K':5}[piece.symbol().upper()]
        color_offset = 0 if piece.color == chess.WHITE else 6
        row = 7 - (sq // 8)
        col = sq % 8
        planes[idx + color_offset, row, col] = 1

    # Side to move plane
    stm_plane = np.full((1, 8, 8), float(board.turn), dtype=np.float32)

    # Additional positional features: 7 channels
    # Attack maps (white, black)
    attack_w = np.zeros((8, 8), dtype=np.float32)
    attack_b = np.zeros((8, 8), dtype=np.float32)
    for sq in chess.SQUARES:
        r = 7 - (sq // 8)
        c = sq % 8
        if board.attackers(chess.WHITE, sq):
            attack_w[r, c] = 1
        if board.attackers(chess.BLACK, sq):
            attack_b[r, c] = 1

    # Legal move mask
    legal_mask = np.zeros((8, 8), dtype=np.float32)
    for mv in board.legal_moves:
        r = 7 - (mv.to_square // 8)
        c = mv.to_square % 8
        legal_mask[r, c] = 1

    # Distance to kings
    dist_wk = np.zeros((8, 8), dtype=np.float32)
    dist_bk = np.zeros((8, 8), dtype=np.float32)
    wksq = board.king(chess.WHITE)
    bksq = board.king(chess.BLACK)
    for sq in chess.SQUARES:
        r = 7 - (sq // 8)
        c = sq % 8
        if wksq is not None:
            dist_wk[r, c] = chess.square_distance(sq, wksq)
        if bksq is not None:
            dist_bk[r, c] = chess.square_distance(sq, bksq)

    # Check status plane
    check_pl = np.full((8, 8), float(board.is_check()), dtype=np.float32)

    # Pinned pieces map
    pinned = np.zeros((8, 8), dtype=np.float32)
    for sq in chess.SQUARES:
        piece = board.piece_at(sq)
        if piece and board.is_pinned(piece.color, sq):
            r = 7 - (sq // 8)
            c = sq % 8
            pinned[r, c] = 1
    # Checking moves mask
    checking_moves_mask = np.zeros((8,8), dtype=np.float32)
    for mv in board.legal_moves:
        board.push(mv)
        if board.is_check():
            r = 7 - (mv.to_square // 8)
            c = mv.to_square % 8
            checking_moves_mask[r, c] = 1
        board.pop()

    controlled_white = np.zeros((8,8), dtype=np.float32)
    controlled_black = np.zeros((8,8), dtype=np.float32)
    for sq in chess.SQUARES:
        r = 7 - (sq // 8)
        c = sq % 8
        white_attackers = len(board.attackers(chess.WHITE, sq))
        black_attackers = len(board.attackers(chess.BLACK, sq))
        if white_attackers > black_attackers:
            controlled_white[r, c] = 1.0
        elif black_attackers > white_attackers:
            controlled_black[r, c] = 1.0

     # Stack all planes: 12 + 1 + 2 + 1 + 2 + 1 + 1 + 2 = 22 channels
    extra = [attack_w, attack_b, legal_mask, dist_wk, dist_bk, check_pl, pinned, controlled_white, controlled_black, checking_moves_mask]
    feature_planes = np.stack(extra, axis=0)
    all_planes = np.concatenate([planes, stm_plane, feature_planes], axis=0)

    return torch.from_numpy(all_planes)


In [4]:
# --- Hyperparameters & Constants ---
EPOCHS = 15          # Number of epochs to train
BATCH_SIZE = 512          # Number of samples per batch
MAX_BATCHES_PER_EPOCH = 10000  # Process exactly 600 batches per epoch
LR = 1e-2                 # Learning rate
BINARY_CLASSES = 2        # Number of classes for binary classification
DATA_PATH = 'data/trainingpuzzles.csv'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Use more workers but be mindful of CPU limits
train_loader = DataLoader(..., batch_size=32, num_workers=2, pin_memory=False)

# Limit PyTorch CPU threads
torch.set_num_threads(4)

# Precompute fen tensors once outside of training
# Save processed tensors to disk, then load quickly during training

class NumberMateCNN(nn.Module):
    def __init__(self):
        super(NumberMateCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(23, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Flatten(), nn.Linear(128*2*2, 256), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(256, 1)
        )
    def forward(self, x):
        out = self.fc(self.conv(x)).squeeze(-1)
        return torch.sigmoid(out)  # outputs in [0,1]

# Binary classification model
class IsMateCNN(nn.Module):
    def __init__(self):
        super(IsMateCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(23, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Flatten(), nn.Linear(128*2*2, 256), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(256, BINARY_CLASSES)
        )
    def forward(self, x): return self.fc(self.conv(x))

import json

def train_binary(model, train_loader, val_loader, device, save_path='binary_model.pt', max_batches=MAX_BATCHES_PER_EPOCH):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)
    model.to(device)

    avg_train_losses = []
    avg_val_losses = []
    val_accuracies = []

    for epoch in range(EPOCHS):
        print(f"Training epoch {epoch}...")
        model.train()
        total_loss = 0
        batch_count = 0
        samples_seen = 0

        for x, y in train_loader:
            if batch_count >= max_batches:
                break
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * x.size(0)
            samples_seen += x.size(0)
            if batch_count%25==0:
                print(f"Running batch {batch_count}")
            batch_count += 1

        avg_train_loss = total_loss / samples_seen
        avg_train_losses.append(avg_train_loss)

        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device)
                logits = model(x_val)
                loss = criterion(logits, y_val)
                val_loss += loss.item() * x_val.size(0)
                preds = logits.argmax(dim=1)
                correct += (preds == y_val).sum().item()
                total += y_val.size(0)

        avg_val_loss = val_loss / total
        val_accuracy = correct / total

        avg_val_losses.append(avg_val_loss)
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1} Train Loss: {avg_train_loss:.4f} Val Loss: {avg_val_loss:.4f} Val Acc: {val_accuracy:.4f}")

        torch.save(model.state_dict(), f"{save_path}_epoch{epoch+1}.pt")

    # Save metrics to JSON file
    with open(f"{save_path}_metrics.json", 'w') as f:
        json.dump({
            'train_loss': avg_train_losses,
            'val_loss': avg_val_losses,
            'val_accuracy': val_accuracies
        }, f)

def validate_distance(model, loader, device, criterion):
    model.eval()
    total_loss = 0
    total_samples = 0
    with torch.no_grad():
        for x, t in loader:
            x, t = x.to(device), t.to(device)
            t_recip = torch.where(t > 0, 1.0 / t, torch.zeros_like(t))
            pred = model(x)
            loss = criterion(pred, t_recip)
            total_loss += loss.item() * x.size(0)
            total_samples += x.size(0)
    avg_loss = total_loss / total_samples
    return avg_loss


import json
import torch
import torch.nn as nn
import torch.optim as optim

def train_distance(model, train_loader, val_loader, device, save_path='distance_model.pt'):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)
    model.to(device)

    EPOCHS = 30
    error_threshold = 1  # acceptable error threshold in mate moves

    avg_train_losses = []
    avg_val_losses = []
    val_accuracies = []
    val_mean_abs_errors = []

    for epoch in range(EPOCHS):
        print(f"Training epoch {epoch}...")
        model.train()
        total_train_loss = 0
        for x, t in train_loader:
            x, t = x.to(device), t.to(device)
            # target: inverse mate distance (1/mate_moves), 0 if no mate
            t_recip = torch.where(t > 0, 1.0 / t, torch.zeros_like(t))
            optimizer.zero_grad()
            pred = model(x)
            loss = criterion(pred, t_recip)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item() * x.size(0)
        avg_train_loss = total_train_loss / len(train_loader.dataset)
        avg_train_losses.append(avg_train_loss)

        # --- Validation ---
        model.eval()
        val_loss = 0
        abs_error_sum = 0
        correct_within_threshold = 0
        total = 0

        with torch.no_grad():
            for x_val, t_val in val_loader:
                x_val, t_val = x_val.to(device), t_val.to(device)
                t_recip_val = torch.where(t_val > 0, 1.0 / t_val, torch.zeros_like(t_val))
                pred_val = model(x_val)

                loss_val = criterion(pred_val, t_recip_val)
                val_loss += loss_val.item() * x_val.size(0)

                # Convert predictions back to mate moves
                pred_moves = torch.where(pred_val > 0, 1.0 / pred_val, torch.full_like(pred_val, 1000.0))
                abs_move_errors = torch.abs(pred_moves - t_val)

                abs_error_sum += abs_move_errors.sum().item()
                correct_within_threshold += (abs_move_errors <= error_threshold).sum().item()
                total += t_val.size(0)

        avg_val_loss = val_loss / len(val_loader.dataset)
        avg_val_losses.append(avg_val_loss)

        mean_abs_error = abs_error_sum / total
        val_mean_abs_errors.append(mean_abs_error)

        val_accuracy = correct_within_threshold / total
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1}: Train Loss={avg_train_loss:.4f}, Val Loss={avg_val_loss:.4f}, Val Acc={val_accuracy:.4f}, Mean Abs Error={mean_abs_error:.4f} moves")

        torch.save(model.state_dict(), f"{save_path}_epoch{epoch+1}.pt")

    # Save metrics to JSON
    with open(f"{save_path}_metrics.json", 'w') as f:
        json.dump({
            'train_loss': avg_train_losses,
            'val_loss': avg_val_losses,
            'val_accuracy': val_accuracies,
            'val_mean_abs_error': val_mean_abs_errors
        }, f)





In [None]:
# prompt: ok load the model from binary_model_epoch16.pt

# Instantiate the model
model = IsMateCNN() # Assuming you want to load the IsMateCNN model based on the filename

# Load the saved state dictionary
# Make sure the file 'binary_model_epoch16.pt' is in your Colab environment or mounted Drive
model.load_state_dict(torch.load('binary_model_epoch16.pt', map_location=DEVICE))

# Move the model to the appropriate device (CPU or GPU)
model.to(DEVICE)

# Set the model to evaluation mode (important for inference)
model.eval()
sum(p.numel() for p in model.parameters())

FileNotFoundError: [Errno 2] No such file or directory: 'binary_model_epoch16.pt'

In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import train_test_split
import pandas as pd
from collections import Counter

# --- Constants ---
EPOCHS = 4
BATCH_SIZE = 512*2
LR = 1e-3
BINARY_CLASSES = 2
DATA_PATH = 'data/trainingpuzzles.csv'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class ChessPuzzleBinaryDataset(Dataset):
    def __init__(self, csv_file, task='binary'):
        self.df = pd.read_csv(csv_file)

        self.task = task  # 'binary' or 'distance'

        if self.task == 'binary':
            # Create binary label: 0 if number==0, else 1
            self.df['distance'] = self.df['Number'].astype(int)
            self.df['label'] = (self.df['Number']!=0).astype(int)

            # Balance classes by downsampling to the smallest class size
            min_class_size = self.df['label'].value_counts().min()
            self.df = (
                self.df.groupby('label')
                .apply(lambda x: x.sample(min_class_size, replace=False))
                .reset_index(drop=True)
            )
            print(f"Balanced classes count: {self.df['label'].value_counts().to_dict()}")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        fen = row['FEN']
        moves = row['Moves']
        number = int(row['Number'])
        x = fen_to_tensor(fen)  # You provide this function

        if self.task == 'binary':
            y = torch.tensor(row['label'], dtype=torch.long)
        else:
            # Distance regression target
            try:
                mate_dist = number
            except:
                mate_dist = 0
            y = torch.tensor(mate_dist, dtype=torch.float32)

        return x, y

# --- Load and split dataset ---
full_ds = ChessPuzzleBinaryDataset(DATA_PATH, task='binary')

labels = full_ds.df['label'].values  # stratify needs labels array
indices = list(range(len(full_ds)))

train_idx, val_idx = train_test_split(
    indices,
    test_size=0.1,
    random_state=42,
    stratify=labels
)

train_loader = DataLoader(Subset(full_ds, train_idx), batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader   = DataLoader(Subset(full_ds, val_idx), batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

# Assuming val_loader.dataset is accessible and has targets attribute or __getitem__ returns (x, y)
val_labels = []

for _, y in val_loader:
    val_labels.extend(y.cpu().numpy())

print("Validation label distribution:", Counter(val_labels))

# --- Train binary classification model ---
binary_model = IsMateCNN()
train_binary(binary_model, train_loader, val_loader, DEVICE, save_path='binary_model')

# --- Switch dataset mode to distance regression ---
full_ds.task = 'distance'
train_loader = DataLoader(Subset(full_ds, train_idx), batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

# --- Train mate-distance regression model ---
distance_model = NumberMateCNN()
train_distance(distance_model, train_loader, val_loader, DEVICE, save_path='distance_model')


  .apply(lambda x: x.sample(min_class_size, replace=False))


Balanced classes count: {0: 270000, 1: 270000}




In [None]:
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
import pandas as pd
from sklearn.model_selection import train_test_split

# Assume fen_to_tensor(fen) is defined elsewhere and converts a FEN string to a (C,H,W) tensor

def flip_fen_horizontal(fen):
    # This function flips the FEN horizontally.
    # Implement or use a chess library to do this properly.
    # Here is a simple placeholder (you need to replace it with a correct one).
    # The core idea: flip each rank string horizontally.
    parts = fen.split(' ')
    board = parts[0]
    ranks = board.split('/')
    flipped_ranks = []
    for rank in ranks:
        flipped_rank = ''
        for ch in reversed(rank):
            flipped_rank += ch
        flipped_ranks.append(flipped_rank)
    flipped_board = '/'.join(flipped_ranks)
    parts[0] = flipped_board
    return ' '.join(parts)

class ChessPuzzleBinaryDataset(Dataset):
    def __init__(self, csv_file, task='binary'):
        self.df = pd.read_csv(csv_file)

        self.task = task

        if self.task == 'binary':
            self.df['label'] = (self.df['Number'] != 0).astype(int)

            # Balance classes by downsampling to the smallest class size
            min_class_size = self.df['label'].value_counts().min()
            self.df = (
                self.df.groupby('label')
                .apply(lambda x: x.sample(min_class_size, replace=False))
                .reset_index(drop=True)
            )
            print(f"Balanced classes count: {self.df['label'].value_counts().to_dict()}")

            # Create flipped versions explicitly
            flipped_df = self.df.copy()
            flipped_df['FEN'] = flipped_df['FEN'].apply(flip_fen_horizontal)
            # The label stays the same for flipped boards
            self.df = pd.concat([self.df, flipped_df], ignore_index=True)
            print(f"Dataset size after adding flipped: {len(self.df)}")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        fen = row['FEN']
        y = torch.tensor(row['label'], dtype=torch.long) if self.task == 'binary' else torch.tensor(row['Number'], dtype=torch.float32)
        x = fen_to_tensor(fen)  # Your function to convert FEN to tensor
        return x, y


def train_binary(model, train_loader, val_loader, device, save_path='binary_model', start_epoch=0, max_batches=None):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)
    model.to(device)

    if start_epoch > 0:
        load_path = f"{save_path}_epoch{start_epoch}.pt"
        if os.path.exists(load_path):
            print(f"Loading model state from {load_path}")
            model.load_state_dict(torch.load(load_path))
        else:
            print(f"Warning: State dict not found at {load_path}. Starting fresh.")

    avg_train_losses = []
    avg_val_losses = []
    val_accuracies = []

    for epoch in range(start_epoch, start_epoch + EPOCHS):
        print(f"Training epoch {epoch}...")
        model.train()
        total_loss = 0
        samples_seen = 0
        batch_count = 0

        for x, y in train_loader:
            if max_batches and batch_count >= max_batches:
                break
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * x.size(0)
            samples_seen += x.size(0)

            if batch_count % 25 == 0:
                print(f"Running batch {batch_count}")
            batch_count += 1

        avg_train_loss = total_loss / samples_seen
        avg_train_losses.append(avg_train_loss)

        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device)
                logits = model(x_val)
                loss = criterion(logits, y_val)
                val_loss += loss.item() * x_val.size(0)
                preds = logits.argmax(dim=1)
                correct += (preds == y_val).sum().item()
                total += y_val.size(0)

        avg_val_loss = val_loss / total
        val_accuracy = correct / total
        avg_val_losses.append(avg_val_loss)
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1} Train Loss: {avg_train_loss:.4f} Val Loss: {avg_val_loss:.4f} Val Acc: {val_accuracy:.4f}")

        torch.save(model.state_dict(), f"{save_path}_epoch{epoch+1}.pt")

    # Save metrics, merge if resuming
    metrics_file = f"{save_path}_metrics.json"
    if os.path.exists(metrics_file) and start_epoch > 0:
        with open(metrics_file, 'r') as f:
            existing = json.load(f)
        avg_train_losses = existing.get('train_loss', []) + avg_train_losses
        avg_val_losses = existing.get('val_loss', []) + avg_val_losses
        val_accuracies = existing.get('val_accuracy', []) + val_accuracies

    with open(metrics_file, 'w') as f:
        json.dump({
            'train_loss': avg_train_losses,
            'val_loss': avg_val_losses,
            'val_accuracy': val_accuracies
        }, f)


def train_distance(model, train_loader, val_loader, device, save_path='distance_model', start_epoch=0):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    model.to(device)

    if start_epoch > 0:
        load_path = f"{save_path}_epoch{start_epoch}.pt"
        if os.path.exists(load_path):
            print(f"Loading model state from {load_path}")
            model.load_state_dict(torch.load(load_path))
        else:
            print(f"Warning: State dict not found at {load_path}. Starting fresh.")

    avg_train_losses = []
    avg_val_losses = []
    val_accuracies = []
    val_mean_abs_errors = []

    error_threshold = 1  # moves

    for epoch in range(start_epoch, start_epoch + EPOCHS):
        print(f"Training epoch {epoch}...")
        model.train()
        total_train_loss = 0
        samples_seen = 0

        for x, t in train_loader:
            x, t = x.to(device), t.to(device)
            t_recip = torch.where(t > 0, 1.0 / t, torch.zeros_like(t))
            optimizer.zero_grad()
            pred = model(x)
            loss = criterion(pred, t_recip)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item() * x.size(0)
            samples_seen += x.size(0)

        avg_train_loss = total_train_loss / samples_seen
        avg_train_losses.append(avg_train_loss)

        model.eval()
        val_loss = 0
        abs_error_sum = 0
        correct_within_threshold = 0
        total_val_samples = 0

        with torch.no_grad():
            for x_val, t_val in val_loader:
                x_val, t_val = x_val.to(device), t_val.to(device)
                t_recip_val = torch.where(t_val > 0, 1.0 / t_val, torch.zeros_like(t_val))
                pred_val = model(x_val)
                loss_val = criterion(pred_val, t_recip_val)
                val_loss += loss_val.item() * x_val.size(0)

                pred_moves = torch.where(pred_val > 1e-6, 1.0 / pred_val, torch.full_like(pred_val, 1000.0))
                abs_move_errors = torch.abs(pred_moves - t_val)

                abs_error_sum += abs_move_errors.sum().item()
                correct_within_threshold += (abs_move_errors <= error_threshold).sum().item()
                total_val_samples += t_val.size(0)

        avg_val_loss = val_loss / total_val_samples
        mean_abs_error = abs_error_sum / total_val_samples
        val_accuracy = correct_within_threshold / total_val_samples

        avg_val_losses.append(avg_val_loss)
        val_mean_abs_errors.append(mean_abs_error)
        val_accuracies.append(val_accuracy)

        print(f"Epoch {epoch+1}: Train Loss={avg_train_loss:.4f}, Val Loss={avg_val_loss:.4f}, Val Acc={val_accuracy:.4f}, Mean Abs Error={mean_abs_error:.4f} moves")

        torch.save(model.state_dict(), f"{save_path}_epoch{epoch+1}.pt")

    metrics_file = f"{save_path}_metrics.json"
    if os.path.exists(metrics_file) and start_epoch > 0:
        with open(metrics_file, 'r') as f:
            existing = json.load(f)
        avg_train_losses = existing.get('train_loss', []) + avg_train_losses
        avg_val_losses = existing.get('val_loss', []) + avg_val_losses
        val_accuracies = existing.get('val_accuracy', []) + val_accuracies
        val_mean_abs_errors = existing.get('val_mean_abs_error', []) + val_mean_abs_errors

    with open(metrics_file, 'w') as f:
        json.dump({
            'train_loss': avg_train_losses,
            'val_loss': avg_val_losses,
            'val_accuracy': val_accuracies,
            'val_mean_abs_error': val_mean_abs_errors
        }, f)


# --- Constants ---
EPOCHS = 20
BATCH_SIZE = 512
LR = 1e-4
DATA_PATH = 'data/trainingpuzzles.csv'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
START_EPOCH_BINARY = 4
START_EPOCH_DISTANCE = 4

# --- Load dataset ---
full_ds = ChessPuzzleBinaryDataset(DATA_PATH, task='binary')

# Use actual length of balanced + flipped dataset for splitting
dataset_len = len(full_ds)
print(f"Total balanced dataset size (including flipped): {dataset_len}")

indices = list(range(dataset_len))
train_idx, val_idx = train_test_split(indices, test_size=0.1, random_state=42)

train_loader_binary = DataLoader(
    Subset(full_ds, train_idx),
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    pin_memory=True
)

val_loader_binary = DataLoader(
    Subset(full_ds, val_idx),
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

# --- Your model classes should be defined somewhere ---
binary_model = IsMateCNN()

# Train binary model from saved epoch 4
train_binary(binary_model, train_loader_binary, val_loader_binary, DEVICE, save_path='binary_model', start_epoch=START_EPOCH_BINARY)


In [None]:
model = IsMateCNN()
model.load_state_dict(torch.load('binary_model_epoch16.pt', map_location=torch.device('cpu')))
model.eval()

IsMateCNN(
  (conv): Sequential(
    (0): Conv2d(23, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=512, out_features=256, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.5, inplace=False)
    (4): Linear(in_features=256, out_features=2, bias=True)
  )
)

In [None]:
import pandas as pd
class ChessPuzzleBinaryDatasetUnbalanced(Dataset):
    def __init__(self, csv_file, task='binary'):
        self.df = pd.read_csv(csv_file)

        self.task = task  # 'binary' or 'distance'

        if self.task == 'binary':
            # Create binary label: 0 if number==0, else 1
            self.df['distance'] = self.df['Number']
            self.df['label'] = (self.df['Number'] != 0).astype(int)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        fen = row['FEN']
        moves = row['Moves']
        number = int(row['Number'])
        x = fen_to_tensor(fen)  # You provide this function

        if self.task == 'binary':
            y = torch.tensor(row['label'], dtype=torch.long)
        else:
            # Distance regression target
            try:
                mate_dist = number
            except:
                mate_dist = 0
            y = torch.tensor(mate_dist, dtype=torch.float32)

        return x, y

test_ds = ChessPuzzleBinaryDatasetUnbalanced('testingpuzzles.csv', task='binary')
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)


In [None]:
all_preds = []
all_labels = []
print(f"Total samples in test dataset: {len(test_ds)}")
total_processed = 0

with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(device), y.to(device)
        outputs = model(x)  # use your loaded model here
        preds = outputs.argmax(dim=1)

        all_preds.append(preds.cpu())
        all_labels.append(y.cpu())

        total_processed += y.size(0)
        if total_processed % 5000 < y.size(0):
            print(f"Processed {total_processed} puzzles so far...")
            print(f"Total predictions collected: {sum(p.size(0) for p in all_preds)}")

all_preds = torch.cat(all_preds)
all_labels = torch.cat(all_labels)


Total samples in test dataset: 171444




Processed 5120 puzzles so far...
Total predictions collected: 5120
Processed 10240 puzzles so far...
Total predictions collected: 10240
Processed 15360 puzzles so far...
Total predictions collected: 15360
Processed 20480 puzzles so far...
Total predictions collected: 20480
Processed 25088 puzzles so far...
Total predictions collected: 25088
Processed 30208 puzzles so far...
Total predictions collected: 30208
Processed 35328 puzzles so far...
Total predictions collected: 35328
Processed 40448 puzzles so far...
Total predictions collected: 40448
Processed 45056 puzzles so far...
Total predictions collected: 45056
Processed 50176 puzzles so far...
Total predictions collected: 50176
Processed 55296 puzzles so far...
Total predictions collected: 55296
Processed 60416 puzzles so far...
Total predictions collected: 60416
Processed 65024 puzzles so far...
Total predictions collected: 65024
Processed 70144 puzzles so far...
Total predictions collected: 70144
Processed 75264 puzzles so far...
To

In [None]:
import numpy as np

# Assuming all_labels and all_preds are 1D numpy arrays or tensors

# Convert to numpy arrays if they are PyTorch tensors
y_true = all_labels.cpu().numpy() if isinstance(all_labels, torch.Tensor) else all_labels
y_pred = all_preds.cpu().numpy() if isinstance(all_preds, torch.Tensor) else all_preds

# Calculate the confusion matrix components manually
# True Negatives (TN): Actual 0, Predicted 0 (Correctly predicted No Mate)
tn = np.sum((y_true == 0) & (y_pred == 0))

# False Positives (FP): Actual 0, Predicted 1 (Incorrectly predicted Mate when it was No Mate)
fp = np.sum((y_true == 0) & (y_pred == 1))

# False Negatives (FN): Actual 1, Predicted 0 (Incorrectly predicted No Mate when it was Mate)
fn = np.sum((y_true == 1) & (y_pred == 0))

# True Positives (TP): Actual 1, Predicted 1 (Correctly predicted Mate)
tp = np.sum((y_true == 1) & (y_pred == 1))

# Print the confusion matrix in a text format
print("Confusion Matrix:")
print("-------------------------")
print(f"{'':<15} | {'Predicted No Mate':<15} | {'Predicted Mate':<15}")
print("-------------------------")
print(f"{'Actual No Mate':<15} | {tn:<15} | {fp:<15}")
print(f"{'Actual Mate':<15} | {fn:<15} | {tp:<15}")
print("-------------------------")

# Optional: Print summary statistics
total_samples = len(y_true)
accuracy = (tp + tn) / total_samples
precision = tp / (tp + fp) if (tp + fp) > 0 else 0 # Avoid division by zero
recall = tp / (tp + fn) if (tp + fn) > 0 else 0   # Avoid division by zero
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 # Avoid division by zero

print(f"\nOverall Accuracy: {accuracy:.4f}")
print(f"Precision (for Mate): {precision:.4f}")
print(f"Recall (for Mate): {recall:.4f}")
print(f"F1-Score (for Mate): {f1_score:.4f}")

Confusion Matrix:
-------------------------
                | Predicted No Mate | Predicted Mate 
-------------------------
Actual No Mate  | 24565           | 5435           
Actual Mate     | 24753           | 116691         
-------------------------

Overall Accuracy: 0.8239
Precision (for Mate): 0.9555
Recall (for Mate): 0.8250
F1-Score (for Mate): 0.8855


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Ensure these are 1D arrays of 0s and 1s
y_true = all_labels.cpu().numpy() if isinstance(all_labels, torch.Tensor) else all_labels
y_pred = all_preds.cpu().numpy() if isinstance(all_preds, torch.Tensor) else all_preds

# Create confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=[0, 1])

# Optional: Normalize by row to get % accuracy per class
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['No Mate', 'Mate'])
disp.plot(cmap='Blues', values_format='d')
plt.title("Confusion Matrix (Mate / No Mate)")
plt.show()

# Optional: show normalized percentages
disp_norm = ConfusionMatrixDisplay(confusion_matrix=cm_normalized, display_labels=['No Mate', 'Mate'])
disp_norm.plot(cmap='Blues', values_format=".2f")
plt.title("Normalized Confusion Matrix (Mate / No Mate)")
plt.show()


In [None]:
print(f"CSV rows: {len(df_test)}")
print(f"Dataset size: {len(test_loader.dataset)}")
print(f"All preds length: {len(all_preds)}")
print(f"All labels length: {len(all_labels)}")

CSV rows: 171444
Dataset size: 60000
All preds length: 36
All labels length: 36


In [None]:
from torchviz import make_dot

# Instantiate your model
model = IsMateCNN()
# Your fen_to_tensor function outputs shape (C, H, W) = (23, 8, 8)
dummy_input = torch.randn(1, 23, 8, 8)
# Generate the graph
dot = make_dot(model(dummy_input), params=dict(model.named_parameters()))
dot.format = 'png'
dot.render('is_mate_cnn_graph', cleanup=True) # This saves a file named is_mate_cnn_graph.png
# You can then view the 'is_mate_cnn_graph.png' file in your Colab files pane.

'is_mate_cnn_graph.png'

In [None]:
import pandas as pd

# Load your CSV
df = pd.read_csv('testingpuzzles.csv')

# Count occurrences of each unique number in 'Number' column
counts = df['Number'].value_counts().sort_index()

# Calculate percentages
percentages = counts / counts.sum() * 100

# Display nicely
for number, percent in percentages.items():
    print(f"Number {number}: {percent:.2f}%")


Number 0: 17.50%
Number 1: 36.60%
Number 2: 35.69%
Number 3: 8.68%
Number 4: 1.26%
Number 5: 0.27%
