# Chess Position Evaluation Model Training

Train neural networks to predict Stockfish evaluation from chess positions.

**Features:**
- Downloads pre-computed evaluations from Lichess (depth 30-40+)
- 17 model architectures to compare
- Uses GPU for fast training

**Runtime:** Select GPU: `Runtime → Change runtime type → T4 GPU`

In [None]:
# Check GPU
!nvidia-smi --query-gpu=name,memory.total --format=csv

## 1. Setup & Install Dependencies

In [None]:
!pip install -q torch numpy requests
!apt-get install -qq zstd

import torch
print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2. Download Lichess Evaluation Data (Streaming)

In [None]:
# Configuration
NUM_POSITIONS = 100000  # Start with 100K, increase to 1M for better results
MIN_DEPTH = 30          # Minimum analysis depth (30+ is very accurate)
SKIP_MATES = True       # Skip mate positions for stable training

print(f"Will extract {NUM_POSITIONS:,} positions with depth >= {MIN_DEPTH}")

In [None]:
import subprocess
import json
import time
import os

LICHESS_EVAL_URL = "https://database.lichess.org/lichess_db_eval.jsonl.zst"

def stream_and_extract_positions(limit, min_depth, skip_mates=True):
    """
    Stream Lichess eval database and extract positions.
    No disk storage needed - pipes directly from URL.
    """
    positions = []
    scores = []
    
    print(f"Streaming from Lichess database...")
    print(f"Target: {limit:,} positions, min depth: {min_depth}")
    print("This may take 10-30 minutes depending on depth filter...\n")
    
    start_time = time.time()
    
    # Stream: curl -> zstd decompress -> python
    process = subprocess.Popen(
        f'curl -sL "{LICHESS_EVAL_URL}" | zstd -d',
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        text=True,
        bufsize=1
    )
    
    processed = 0
    last_print = 0
    
    for line in process.stdout:
        if len(positions) >= limit:
            break
            
        processed += 1
        
        # Progress update every 10K entries
        if processed - last_print >= 10000:
            elapsed = time.time() - start_time
            rate = processed / elapsed
            print(f"\rProcessed: {processed:,} | Found: {len(positions):,}/{limit:,} | Rate: {rate:.0f}/s", end="")
            last_print = processed
        
        try:
            entry = json.loads(line.strip())
            
            fen = entry.get("fen")
            evals = entry.get("evals", [])
            
            if not fen or not evals:
                continue
            
            # Get best/deepest eval
            best_eval = max(evals, key=lambda e: e.get("depth", 0))
            depth = best_eval.get("depth", 0)
            
            if depth < min_depth:
                continue
            
            pvs = best_eval.get("pvs", [])
            if not pvs:
                continue
            
            pv = pvs[0]
            
            # Get score
            if "cp" in pv:
                score_cp = pv["cp"]
            elif "mate" in pv:
                if skip_mates:
                    continue
                mate_in = pv["mate"]
                score_cp = 10000 - abs(mate_in) * 10
                if mate_in < 0:
                    score_cp = -score_cp
            else:
                continue
            
            # Skip extreme scores for training stability
            if abs(score_cp) > 5000:
                continue
            
            positions.append(fen)
            scores.append(score_cp)
            
        except (json.JSONDecodeError, Exception):
            continue
    
    process.terminate()
    
    elapsed = time.time() - start_time
    print(f"\n\nDone! Extracted {len(positions):,} positions in {elapsed:.1f}s")
    
    return positions, scores

# Extract positions
positions, scores = stream_and_extract_positions(
    limit=NUM_POSITIONS,
    min_depth=MIN_DEPTH,
    skip_mates=SKIP_MATES
)

In [None]:
# Quick stats
import numpy as np

scores_np = np.array(scores)
print(f"Positions: {len(positions):,}")
print(f"Score range: {scores_np.min():.0f} to {scores_np.max():.0f} centipawns")
print(f"Mean score: {scores_np.mean():.1f} cp")
print(f"Std dev: {scores_np.std():.1f} cp")

# Sample positions
print("\nSample positions:")
for i in range(3):
    print(f"  {positions[i][:50]}... → {scores[i]} cp")

## 3. Prepare Dataset

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np

# Piece mappings
PIECE_TO_INDEX = {
    'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
    'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11,
}

def fen_to_tensor(fen):
    """Convert FEN to 12x8x8 tensor."""
    board = np.zeros((12, 8, 8), dtype=np.float32)
    piece_placement = fen.split()[0]
    row, col = 0, 0
    
    for char in piece_placement:
        if char == '/':
            row += 1
            col = 0
        elif char.isdigit():
            col += int(char)
        elif char in PIECE_TO_INDEX:
            if row < 8 and col < 8:
                board[PIECE_TO_INDEX[char], row, col] = 1.0
            col += 1
    
    return board

class ChessDataset(Dataset):
    def __init__(self, positions, scores):
        print("Converting FEN to tensors...")
        self.boards = np.array([fen_to_tensor(fen) for fen in positions])
        self.scores = np.array(scores, dtype=np.float32)
        
        # Normalize scores
        self.score_scale = 500.0  # 5 pawns = 1.0
        self.scores_normalized = self.scores / self.score_scale
        print(f"Dataset ready: {len(self)} positions")
    
    def __len__(self):
        return len(self.boards)
    
    def __getitem__(self, idx):
        return (
            torch.from_numpy(self.boards[idx]),
            torch.tensor(self.scores_normalized[idx], dtype=torch.float32)
        )

# Create dataset
dataset = ChessDataset(positions, scores)

# Split: 80% train, 10% val, 10% test
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(
    dataset, [train_size, val_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

print(f"Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}")

# Data loaders
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

## 4. Model Architectures

In [None]:
import torch.nn.functional as F

# ============= MLP Models =============

class MLP_Small(nn.Module):
    """Small MLP - 200K params, fast baseline"""
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(12 * 8 * 8, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    def forward(self, x):
        return self.layers(x).squeeze(-1)

class MLP_Large(nn.Module):
    """Large MLP - 1M params"""
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(12 * 8 * 8, 1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
    def forward(self, x):
        return self.layers(x).squeeze(-1)

# ============= CNN Models =============

class CNN_Medium(nn.Module):
    """Medium CNN - 13M params"""
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(12, 64, 3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm2d(256)
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 1)
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x).squeeze(-1)

# ============= ResNet Models =============

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(channels)
        self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(channels)
    
    def forward(self, x):
        residual = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        return F.relu(x + residual)

class ResNet_Small(nn.Module):
    """Small ResNet - 600K params, AlphaZero-style"""
    def __init__(self):
        super().__init__()
        self.conv_in = nn.Conv2d(12, 64, 3, padding=1)
        self.bn_in = nn.BatchNorm2d(64)
        self.blocks = nn.Sequential(*[ResidualBlock(64) for _ in range(4)])
        self.fc1 = nn.Linear(64 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 1)
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x):
        x = F.relu(self.bn_in(self.conv_in(x)))
        x = self.blocks(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x).squeeze(-1)

class ResNet_Medium(nn.Module):
    """Medium ResNet - 3M params"""
    def __init__(self):
        super().__init__()
        self.conv_in = nn.Conv2d(12, 128, 3, padding=1)
        self.bn_in = nn.BatchNorm2d(128)
        self.blocks = nn.Sequential(*[ResidualBlock(128) for _ in range(8)])
        self.fc1 = nn.Linear(128 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 1)
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x):
        x = F.relu(self.bn_in(self.conv_in(x)))
        x = self.blocks(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x).squeeze(-1)

class ResNet_Large(nn.Module):
    """Large ResNet - 20M params, Lc0-style"""
    def __init__(self):
        super().__init__()
        self.conv_in = nn.Conv2d(12, 256, 3, padding=1)
        self.bn_in = nn.BatchNorm2d(256)
        self.blocks = nn.Sequential(*[ResidualBlock(256) for _ in range(16)])
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 1)
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x):
        x = F.relu(self.bn_in(self.conv_in(x)))
        x = self.blocks(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x).squeeze(-1)

# ============= Transformer =============

class ChessTransformer(nn.Module):
    """Transformer - 5M params"""
    def __init__(self, d_model=256, nhead=8, num_layers=6):
        super().__init__()
        self.input_proj = nn.Linear(12, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1, 64, d_model) * 0.02)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=d_model*4,
            dropout=0.1, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc1 = nn.Linear(d_model, 256)
        self.fc2 = nn.Linear(256, 1)
    
    def forward(self, x):
        B = x.size(0)
        x = x.view(B, 12, 64).permute(0, 2, 1)
        x = self.input_proj(x) + self.pos_encoding
        x = self.transformer(x)
        x = x.mean(dim=1)
        x = F.relu(self.fc1(x))
        return self.fc2(x).squeeze(-1)

# ============= NNUE-style =============

class NNUE_Style(nn.Module):
    """NNUE-style - 500K params, fast inference like Stockfish"""
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(12 * 64, 256)
        self.fc2 = nn.Linear(256, 32)
        self.fc3 = nn.Linear(32, 32)
        self.fc4 = nn.Linear(32, 1)
    
    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.clamp(F.relu(self.fc1(x)), 0, 1)
        x = torch.clamp(F.relu(self.fc2(x)), 0, 1)
        x = torch.clamp(F.relu(self.fc3(x)), 0, 1)
        return self.fc4(x).squeeze(-1)

# Model registry
MODELS = {
    "mlp_small": MLP_Small,
    "mlp_large": MLP_Large,
    "cnn": CNN_Medium,
    "resnet_small": ResNet_Small,
    "resnet_medium": ResNet_Medium,
    "resnet_large": ResNet_Large,
    "transformer": ChessTransformer,
    "nnue": NNUE_Style,
}

print(f"Available models: {list(MODELS.keys())}")

## 5. Training Functions

In [None]:
import time

def count_parameters(model):
    return sum(p.numel() for p in model.parameters())

def train_model(model, train_loader, val_loader, epochs, device, score_scale=500.0):
    """Train a model and return metrics."""
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
    
    best_val_loss = float('inf')
    best_state = None
    
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0.0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(train_loader)
        
        # Validation
        model.eval()
        val_loss = 0.0
        val_mae = 0.0
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                outputs = model(batch_x)
                val_loss += criterion(outputs, batch_y).item()
                val_mae += torch.abs(outputs - batch_y).mean().item() * score_scale
        val_loss /= len(val_loader)
        val_mae /= len(val_loader)
        
        scheduler.step(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = model.state_dict().copy()
        
        print(f"Epoch {epoch+1}/{epochs} | Train: {train_loss:.4f} | Val: {val_loss:.4f} | MAE: {val_mae:.1f} cp")
    
    # Load best weights
    model.load_state_dict(best_state)
    return model

def evaluate_model(model, test_loader, device, score_scale=500.0):
    """Evaluate model and return metrics."""
    model.eval()
    predictions = []
    actuals = []
    inference_times = []
    
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x = batch_x.to(device)
            start = time.perf_counter()
            outputs = model(batch_x)
            inference_times.append((time.perf_counter() - start) / len(batch_x))
            predictions.extend((outputs.cpu().numpy() * score_scale).tolist())
            actuals.extend((batch_y.numpy() * score_scale).tolist())
    
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    
    mae = np.mean(np.abs(predictions - actuals))
    rmse = np.sqrt(np.mean((predictions - actuals) ** 2))
    correlation = np.corrcoef(predictions, actuals)[0, 1]
    throughput = 1000 / (np.mean(inference_times) * 1000)
    
    return {
        "mae": mae,
        "rmse": rmse,
        "correlation": correlation,
        "throughput": throughput,
    }

## 6. Train Single Model

In [None]:
# Choose model
MODEL_NAME = "resnet_medium"  # Options: mlp_small, mlp_large, cnn, resnet_small, resnet_medium, resnet_large, transformer, nnue
EPOCHS = 30

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using: {device}")

# Create model
model = MODELS[MODEL_NAME]().to(device)
print(f"\nModel: {MODEL_NAME}")
print(f"Parameters: {count_parameters(model):,}")

# Train
print(f"\nTraining for {EPOCHS} epochs...")
print("-" * 50)
model = train_model(model, train_loader, val_loader, EPOCHS, device, dataset.score_scale)

# Evaluate
metrics = evaluate_model(model, test_loader, device, dataset.score_scale)
print(f"\n{'='*50}")
print(f"RESULTS: {MODEL_NAME}")
print(f"{'='*50}")
print(f"MAE: {metrics['mae']:.1f} centipawns ({metrics['mae']/100:.2f} pawns)")
print(f"RMSE: {metrics['rmse']:.1f} centipawns")
print(f"Correlation: {metrics['correlation']:.4f}")
print(f"Throughput: {metrics['throughput']:.0f} positions/sec")

## 7. Compare All Models

In [None]:
# Compare all models
EPOCHS = 20  # Fewer epochs for comparison
results = []

for name, model_class in MODELS.items():
    print(f"\n{'#'*60}")
    print(f"Training: {name}")
    print(f"{'#'*60}")
    
    model = model_class().to(device)
    params = count_parameters(model)
    print(f"Parameters: {params:,}")
    
    model = train_model(model, train_loader, val_loader, EPOCHS, device, dataset.score_scale)
    metrics = evaluate_model(model, test_loader, device, dataset.score_scale)
    
    results.append({
        "model": name,
        "params": params,
        **metrics
    })
    
    # Save model
    torch.save(model.state_dict(), f"best_{name}.pth")

# Print comparison table
print(f"\n{'='*80}")
print("MODEL COMPARISON")
print(f"{'='*80}")
print(f"{'Model':<20} {'Params':>10} {'MAE (cp)':>10} {'Corr':>8} {'Speed':>12}")
print("-" * 80)

for r in sorted(results, key=lambda x: x['mae']):
    print(f"{r['model']:<20} {r['params']:>10,} {r['mae']:>10.1f} {r['correlation']:>8.4f} {r['throughput']:>10.0f}/s")

## 8. Save Best Model to Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Save best model
import shutil
import os

save_dir = "/content/drive/MyDrive/chess_models"
os.makedirs(save_dir, exist_ok=True)

# Find best model from comparison
if results:
    best = min(results, key=lambda x: x['mae'])
    print(f"Best model: {best['model']} (MAE: {best['mae']:.1f} cp)")
    
    src = f"best_{best['model']}.pth"
    dst = f"{save_dir}/best_{best['model']}.pth"
    shutil.copy(src, dst)
    print(f"Saved to: {dst}")
else:
    # Save single model
    src = f"best_{MODEL_NAME}.pth"
    dst = f"{save_dir}/best_{MODEL_NAME}.pth"
    shutil.copy(src, dst)
    print(f"Saved to: {dst}")

## 9. Test Prediction

In [None]:
# Test on some positions
test_fens = [
    "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1",  # Starting position
    "rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq e3 0 1",  # After 1.e4
    "r1bqkb1r/pppp1ppp/2n2n2/4p3/2B1P3/5N2/PPPP1PPP/RNBQK2R w KQkq - 4 4",  # Italian Game
    "8/8/8/8/8/5K2/4Q3/7k w - - 0 1",  # King + Queen vs King (winning)
]

model.eval()
print("Position Predictions:")
print("-" * 60)

for fen in test_fens:
    tensor = torch.from_numpy(fen_to_tensor(fen)).unsqueeze(0).to(device)
    with torch.no_grad():
        pred = model(tensor).item() * dataset.score_scale
    print(f"{fen[:40]}...")
    print(f"  Predicted: {pred:+.0f} cp ({pred/100:+.2f} pawns)\n")