Import

In [None]:
import os
import chess
import chess.engine
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout, Conv2D, Flatten, Input
from collections import deque
import random
import csv
import h5py
import time
import threading

Consts and hiperparameters

In [None]:
EPISODES = 10000
GAMMA = 0.95
LEARNING_RATE = 0.001
WEIGHT_DECAY = 1e-4  
BATCH_SIZE = 64
MEMORY_SIZE = 10000
SAVE_INTERVAL = 50  
TAU_MAX = 1.0   
TAU_MIN = 0.1   
TAU_DECAY = 0.9995  
MAX_DEPTH = 16 
NUM_THREADS = 4  
TIME_LIMIT = 2.0  

Neural nertwork

In [None]:
def build_model(input_shape=(8, 8, 12)):
    model = Sequential()
    model.add(Input(shape=input_shape))
    model.add(Conv2D(64, kernel_size=3, activation='relu', padding='same'))
    model.add(Conv2D(64, kernel_size=3, activation='relu', padding='same'))
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.3))
    model.add(Dense(1, activation='linear'))
    optimizer = tf.keras.optimizers.AdamW(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
    model.compile(optimizer=optimizer, loss='mean_squared_error')  
    return model

Class Node for MCTS

In [None]:
class Node:
    def __init__(self, board, parent=None, move=None, agent=None):
        self.board = board.copy()
        self.parent = parent
        self.move = move
        self.children = []
        self.visits = 0
        self.value = 0.0
        self.agent = agent
        self.lock = threading.Lock()

    def is_fully_expanded(self):
        return len(self.children) == len(list(self.board.legal_moves))

    def best_child(self):
        with self.lock:
            visits = np.array([child.visits for child in self.children], dtype=np.float32)
            q_values = np.array([child.value / (child.visits + 1e-8) for child in self.children])
            probabilities = self.agent.softmax(q_values)
            if probabilities is None or len(probabilities) == 0:
                return random.choice(self.children)
            return np.random.choice(self.children, p=probabilities)

    def expand(self):
        with self.lock:
            tried_moves = [child.move for child in self.children]
            legal_moves = list(self.board.legal_moves)
            for move in legal_moves:
                if move not in tried_moves:
                    new_board = self.board.copy()
                    new_board.push(move)
                    child_node = Node(new_board, parent=self, move=move, agent=self.agent)
                    self.children.append(child_node)
                    return child_node
            return None  # Brak niepróbowanych ruchów

Chess agent

In [None]:
class ChessAgent:
    def __init__(self):
        self.memory = deque(maxlen=MEMORY_SIZE)
        self.tau = TAU_MAX
        self.model = build_model()
        self.transposition_table = {}
        self.tree_lock = threading.Lock()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def softmax(self, x):
        if len(x) == 0:
            return None 
        z = x - np.max(x)
        e_x = np.exp(z / self.tau)
        softmax_values = e_x / e_x.sum()
        if np.any(np.isnan(softmax_values)) or np.any(np.isinf(softmax_values)):
            return np.ones_like(e_x) / len(e_x)
        return softmax_values

    def act_with_mcts_and_softmax(self, board, time_limit=TIME_LIMIT, num_threads=NUM_THREADS, max_depth=MAX_DEPTH):
        root = Node(board, agent=self)
        root.visits = 1 
        start_time = time.time()

        def run_simulation():
            node = root
            while node.is_fully_expanded() and not node.board.is_game_over():
                node = node.best_child()
            if not node.board.is_game_over():
                node = node.expand()
                if node is None:
                    return  
            result = self.simulate(node.board, max_depth=max_depth)
            self.backpropagate(node, result)

        threads = []
        while time.time() - start_time < time_limit:
            if len(threads) < num_threads:
                thread = threading.Thread(target=run_simulation)
                thread.start()
                threads.append(thread)
            threads = [t for t in threads if t.is_alive()]
            time.sleep(0.01)

        for thread in threads:
            thread.join()

        visits = np.array([child.visits for child in root.children], dtype=np.float32)
        probabilities = self.softmax(visits)
        if probabilities is None or len(root.children) == 0:
            return random.choice(list(board.legal_moves))
        best_child = np.random.choice(root.children, p=probabilities)
        return best_child.move

    def backpropagate(self, node, result):
        while node is not None:
            with node.lock:
                node.visits += 1
                node.value += result
            node = node.parent

    def simulate(self, board, max_depth=MAX_DEPTH):
        current_board = board.copy()
        depth = 0
        while not current_board.is_game_over() and depth < max_depth:
            legal_moves = list(current_board.legal_moves)
            if not legal_moves:
                break
            move = random.choice(legal_moves)
            current_board.push(move)
            depth += 1
        result = self.evaluate_state(current_board)
        return result

    def evaluate_state(self, board):
        board_fen = board.fen()
        if board_fen in self.transposition_table:
            return self.transposition_table[board_fen]
        else:
            input_state = self.state_to_input(board)
            value = self.model.predict(input_state, verbose=0)[0][0]
            self.transposition_table[board_fen] = value
            return value

    def replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        minibatch = random.sample(self.memory, BATCH_SIZE)
        states = []
        targets = []
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                next_state_value = self.model.predict(next_state, verbose=0)[0][0]
                target = reward + GAMMA * next_state_value
            target_f = self.model.predict(state, verbose=0)
            target_f[0][0] = target
            states.append(state[0])
            targets.append(target_f[0])
        states = np.array(states)
        targets = np.array(targets)
        self.model.fit(states, targets, epochs=1, verbose=0)
        if self.tau > TAU_MIN:
            self.tau *= TAU_DECAY

    def state_to_input(self, board):
        planes = np.zeros((8, 8, 12))
        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece:
                piece_type = piece.piece_type - 1
                color = 0 if piece.color == chess.WHITE else 6
                row = 7 - chess.square_rank(square)
                col = chess.square_file(square)
                planes[row, col, piece_type + color] = 1
        return np.expand_dims(planes, axis=0)


Reward for agent

In [None]:
def material_count(board):
    piece_values = {
        chess.PAWN: 1,
        chess.KNIGHT: 3,
        chess.BISHOP: 3,
        chess.ROOK: 5,
        chess.QUEEN: 9,
        chess.KING: 0
    }
    material = 0
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            value = piece_values[piece.piece_type]
            material += value if piece.color == chess.WHITE else -value
    return material

def get_reward(board, previous_material_count):
    reward = 0
    if board.is_checkmate():
        reward = 1 if board.turn == chess.BLACK else -1
    elif board.is_stalemate() or board.is_insufficient_material():
        reward = 0
    else:
        current_material_count = material_count(board)
        material_difference = current_material_count - previous_material_count
        reward += 0.1 * material_difference
        center_squares = [chess.D4, chess.E4, chess.D5, chess.E5]
        for square in center_squares:
            piece = board.piece_at(square)
            if piece and piece.color == board.turn:
                reward += 0.05
        reward += 0.01 * len(list(board.legal_moves))
        if board.is_check():
            reward -= 0.3
    return reward

Auxiliary functions

In [None]:
def save_training_state(agent, episode, model_save_path, stockfish_level):
    model_save_filename = f"{model_save_path}{episode}.h5"
    agent.model.save(model_save_filename)
    with h5py.File(model_save_filename, 'a') as h5file:
        h5file.attrs['tau'] = agent.tau
        h5file.attrs['episode'] = episode
        h5file.attrs['stockfish_level'] = stockfish_level
    print(f"Model i stan treningu zapisany po epizodzie {episode}")

def load_training_state(agent, model_save_path, after_episode=0):
    model_filename = f"{model_save_path}{after_episode}.h5"
    if os.path.exists(model_filename):
        print(f"Wczytywanie modelu z pliku: {model_filename}")
        agent.model = load_model(model_filename, compile=False)
        optimizer = tf.keras.optimizers.AdamW(learning_rate=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
        agent.model.compile(optimizer=optimizer, loss='mean_squared_error')
        with h5py.File(model_filename, 'r') as h5file:
            agent.tau = h5file.attrs.get('tau', TAU_MAX)
            episode = h5file.attrs['episode']
            stockfish_level = h5file.attrs['stockfish_level']
        print(f"Wczytano stan treningu: epizod {episode}, TAU: {agent.tau}, poziom Stockfisha: {stockfish_level}")
    else:
        print("Brak zapisanego stanu treningu.")
        episode = 0
        stockfish_level = 1
    return episode, stockfish_level

Training loop

In [None]:
def train_agent(agent, episodes=EPISODES, stockfish_path="stockfish/stockfish-windows-x86-64.exe",
               save_interval=SAVE_INTERVAL, model_save_path="saved_model/", load_existing_model=False, stats_path="stats.csv", after_episode=0):
    if not os.path.exists(model_save_path):
        os.makedirs(model_save_path)
    if not os.path.exists(stats_path):
        with open(stats_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Episode', 'Result', 'Moves', 'Avg Reward', 'TAU'])
    if load_existing_model:
        start_episode, stockfish_level = load_training_state(agent, model_save_path, after_episode)
    else:
        start_episode = 0
        stockfish_level = 1
    engine = chess.engine.SimpleEngine.popen_uci(stockfish_path)
    engine.configure({"Skill Level": stockfish_level})
    for e in range(start_episode, episodes):
        board = chess.Board()
        done = False
        total_reward = 0
        moves_in_game = 0
        previous_material_count = material_count(board)
        while not done:
            if board.turn == chess.WHITE:
                state = agent.state_to_input(board)
                action = agent.act_with_mcts_and_softmax(board, time_limit=TIME_LIMIT, num_threads=NUM_THREADS, max_depth=MAX_DEPTH)
                board.push(action)
                done = board.is_game_over()
                next_state = agent.state_to_input(board)
                reward = get_reward(board, previous_material_count)
                previous_material_count = material_count(board)
                total_reward += reward
                agent.remember(state, action, reward, next_state, done)
                moves_in_game += 1
            else:
                result = engine.play(board, chess.engine.Limit(time=0.1))
                board.push(result.move)
                done = board.is_game_over()
        agent.replay()
        print(f"Epizod {e+1}/{episodes}, Wynik: {result_str}, Ruchy: {moves_in_game}, "
              f"Średnia nagroda: {avg_reward:.2f}, TAU: {agent.tau:.4f}, "
              f"Poziom Stockfisha: {stockfish_level}")
    engine.quit()


In [None]:
agent = ChessAgent()
train_agent(agent, episodes=EPISODES, model_save_path='saved_model/', load_existing_model=False)