In [1]:
import chess.pgn
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical
'''
# Function to parse PGN file and extract game data
def parse_pgn_file(file_path, max_games=5000):
    games = []
    with open(file_path) as f:
        game_counter = 0
        while True:
            game = chess.pgn.read_game(f)
            if game is None or game_counter >= max_games:
                break
            games.append(game)
            game_counter += 1
    return games

# Load PGN data
pgn_file = "/Users/ruvinjagoda/Desktop/Aka/AIP/Chess_code/2023_full.pgn"
games = parse_pgn_file(pgn_file, max_games=50000)

# Check the number of games loaded
print(f"Number of games loaded: {len(games)}")
'''
pgn_file = "/Users/ruvinjagoda/Desktop/Aka/AIP/Chess_code/2023_full.pgn"
# Function to parse PGN file and yield game data in batches
def parse_pgn_file_in_batches(file_path, batch_size=5000):
    with open(file_path) as f:
        while True:
            games = []
            for _ in range(batch_size):
                game = chess.pgn.read_game(f)
                if game is None:
                    break
                games.append(game)
            if not games:
                break
            yield games



In [None]:
import numpy as np
import chess
import chess.pgn
from tensorflow.keras import models, layers, losses
from tensorflow.keras.utils import to_categorical
import tensorflow as tf

# Function to convert board state to one-hot encoding
def board_to_one_hot(board):
    one_hot = np.zeros((8, 8, 12), dtype=np.int8)
    piece_map = {chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
                 chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5}
    for square in chess.scan_reversed(chess.BB_ALL):
        piece = board.piece_at(square)
        if piece is not None:
            piece_index = piece_map[piece.piece_type] + (6 if piece.color else 0)
            one_hot[chess.square_rank(square), chess.square_file(square), piece_index] = 1
    return one_hot

# Function to convert move to label
def move_to_label(move):
    from_square = move.from_square
    to_square = move.to_square
    return from_square * 64 + to_square

# Generator function to yield batches of data
def data_generator(games, batch_size):
    X_batch, y_batch, boards = [], [], []
    while True:
        for game in games:
            board = game.board()
            for move in game.mainline_moves():
                X_batch.append(board_to_one_hot(board))
                y_batch.append(move_to_label(move))
                boards.append(board.copy())
                board.push(move)
                if len(X_batch) == batch_size:
                    yield np.array(X_batch, dtype=np.float32), np.array(y_batch), boards
                    X_batch, y_batch, boards = [], [], []

# Define the CNN model
def create_model():
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(8, 8, 12), padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dense(4096, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Custom training loop
def train_model(model, games, batch_size, epochs_per_cycle, steps_per_epoch):
    optimizer = tf.keras.optimizers.Adam()
    illegal_move_penalty = 1000  # Large penalty for illegal moves

    for epoch in range(epochs_per_cycle):
        print(f"Epoch {epoch + 1}/{epochs_per_cycle}")
        for step, (X_batch, y_batch, boards) in enumerate(data_generator(games, batch_size)):
            if step >= steps_per_epoch:
                break

            with tf.GradientTape() as tape:
                predictions = model(X_batch, training=True)
                loss = tf.keras.losses.categorical_crossentropy(to_categorical(y_batch, num_classes=4096), predictions)

                # Penalty for illegal moves
                for i, board in enumerate(boards):
                    from_square, to_square = divmod(tf.argmax(predictions[i]).numpy(), 64)
                    move = chess.Move(from_square, to_square)
                    if move not in board.legal_moves:
                        loss = loss + illegal_move_penalty

            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            if step % 100 == 0:
                print(f"Step {step}/{steps_per_epoch}, Loss: {tf.reduce_mean(loss).numpy()}")

# Training parameters
batch_size = 128
games_per_batch = 5000
epochs_per_cycle = 5  # Number of epochs to train on each batch of games

# Train the model in cycles
total_games = 50000
model = create_model()

for cycle in range(total_games // games_per_batch):
    print(f"Training cycle {cycle + 1}/{total_games // games_per_batch}")
    train_games = next(parse_pgn_file_in_batches(pgn_file, games_per_batch))
    steps_per_epoch = sum(1 for game in train_games for _ in game.mainline_moves()) // batch_size
    
    train_model(model, train_games, batch_size, epochs_per_cycle, steps_per_epoch)

# Evaluate the model using the last chunk of test games
test_games = next(parse_pgn_file_in_batches(pgn_file, games_per_batch))
validation_steps = sum(1 for game in test_games for _ in game.mainline_moves()) // batch_size

def evaluate_model(model, games, batch_size, steps):
    total_loss = 0.0
    total_acc = 0.0
    for step, (X_batch, y_batch, boards) in enumerate(data_generator(games, batch_size)):
        if step >= steps:
            break
        predictions = model(X_batch, training=False)
        loss = tf.keras.losses.categorical_crossentropy(to_categorical(y_batch, num_classes=4096), predictions)
        
        # Check for illegal moves and penalize
        for i, board in enumerate(boards):
            from_square, to_square = divmod(tf.argmax(predictions[i]).numpy(), 64)
            move = chess.Move(from_square, to_square)
            if move not in board.legal_moves:
                loss = loss + illegal_move_penalty

        total_loss += tf.reduce_mean(loss).numpy()
        total_acc += tf.reduce_mean(tf.keras.metrics.categorical_accuracy(to_categorical(y_batch, num_classes=4096), predictions)).numpy()

    avg_loss = total_loss / steps
    avg_acc = total_acc / steps
    return avg_loss, avg_acc

test_loss, test_acc = evaluate_model(model, test_games, batch_size, validation_steps)
print(f"Test accuracy: {test_acc}")


Training cycle 1/10




Epoch 1/5
Step 0/2903, Loss: 118008.3125
Step 100/2903, Loss: 126006.9375
Step 200/2903, Loss: 116006.34375
Step 300/2903, Loss: 103006.515625
Step 400/2903, Loss: 95006.03125
Step 500/2903, Loss: 116005.8046875
Step 600/2903, Loss: 87005.78125
Step 700/2903, Loss: 90006.40625
Step 800/2903, Loss: 84005.90625
Step 900/2903, Loss: 96005.953125
Step 1000/2903, Loss: 77006.359375
Step 1100/2903, Loss: 93005.171875
Step 1200/2903, Loss: 78006.109375
Step 1300/2903, Loss: 92006.2890625
Step 1400/2903, Loss: 95006.4765625
Step 1500/2903, Loss: 86006.65625
Step 1600/2903, Loss: 98006.28125
Step 1700/2903, Loss: 84005.875
Step 1800/2903, Loss: 78006.03125
Step 1900/2903, Loss: 72005.65625
Step 2000/2903, Loss: 75006.21875
Step 2100/2903, Loss: 86005.9140625
Step 2200/2903, Loss: 73005.8828125
Step 2300/2903, Loss: 79006.09375
Step 2400/2903, Loss: 85005.734375
Step 2500/2903, Loss: 83006.03125
Step 2600/2903, Loss: 87005.640625
Step 2700/2903, Loss: 95006.2265625
Step 2800/2903, Loss: 100006.2

In [None]:
model.save("model_weights_v5.h5")
