In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import spaces

import chess.engine
import chess.pgn

import sys
sys.path.append("../build")
import chess_engine as ce
from chess_engine import Board, Generator, print_board, Color, State, Piece

In [2]:
class ChessEnv(gym.Env):
    def __init__(self):
        super(ChessEnv, self).__init__()
        NUM_ROWS = 8
        NUM_COLS = 8
        NUM_ACTIONS = 40 # Number of legal moves

        self.MIN_EVAL = -10000
        self.MAX_EVAL = 10000

        # Define action and observation space
        self.action_space = spaces.Discrete(NUM_ACTIONS)
        self.observation_space = spaces.Box(low=self.MIN_EVAL, high=self.MAX_EVAL, shape=(NUM_ROWS, NUM_COLS, 12))
        self.generator = Generator()
        self.board = Board()
        self.generator.generate_legal_moves(self.board)
        self.engine = chess.engine.SimpleEngine.popen_uci("/opt/homebrew/bin/stockfish")
        self.legal_moves = self.generator.get_legal_moves()

        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None

    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)
        # Reset the environment to initial state
        self.board.reset()
        self.generator.generate_legal_moves(self.board)
        self.NUM_ACTIONS = self.generator.get_n_legal_moves()
        self.legal_moves = self.generator.get_legal_moves()
        self.action_space = spaces.Discrete(self.NUM_ACTIONS)
        
        return self.get_observation(), {}
    
    def step(self, action):
        # Take action and return next state, reward, done, info
        self.board.make_move(action)

        self.generator.generate_legal_moves(self.board) # Updates state of the passed board object
        self.legal_moves = self.generator.get_legal_moves()
        done = (self.board.get_state() != State.PLAY) or self.generator.get_n_legal_moves() == 0 # Completely winning position (+20 pawns, 2000 centipawns)

        if not done:
            # Make black take a step, so the agent will only ever play white hence it will become good at maximising the score
            # Play a random move
            self.board.make_move(np.random.choice(self.legal_moves, 1))
            self.generator.generate_legal_moves(self.board) # Updates state of the passed board object
            self.legal_moves = self.generator.get_legal_moves()
            done = (self.board.get_state() != State.PLAY) or self.generator.get_n_legal_moves() == 0

        board = chess.Board(self.board.get_fen())
        analysis = self.engine.analyse(board, chess.engine.Limit(time=0.15))
        reward = analysis["score"].white().score(mate_score=self.MAX_EVAL)

        self.NUM_ACTIONS = self.generator.get_n_legal_moves()
        if self.NUM_ACTIONS == 0:
            self.NUM_ACTIONS = 1 # "done" command now issued anyway so this wont matter
        self.action_space = spaces.Discrete(self.NUM_ACTIONS)

        return self.get_observation(), reward, done, False, {}
    
    def get_observation(self):
        # Get the current observation/state of the environment
        return self.board
    
    def close(self):
        self.engine.quit() # Cleanup memory from stockfish
        del self.board
        del self.generator

#  Basic Test That Env Works
First check my environment I just made is working is a reasonable fashion.

In [3]:
env = ChessEnv()
#observation = env.reset()
#action = env.action_space.sample()
#observation, reward, done, info = env.step(env.legal_moves[action])

# Training an Agent

In [4]:
from collections import defaultdict
from tqdm import tqdm

class ChessAgent:
    def __init__(self, 
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95
    ):
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: tuple[int, int, bool]) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return env.action_space.sample()

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))
        
    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Updates the Q-value of an action."""
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [5]:
# hyperparameters
learning_rate = 0.01
n_episodes = 100
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1

agent = ChessAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

In [6]:
def bitboard_to_array(bitboard):
    # Initialize an empty 8x8 numpy array
    array = np.zeros((8, 8), dtype=int)
    # Iterate over each square on the board
    for square in range(64):
        # Check if the corresponding bit in the bitboard is set
        if bitboard & (1 << square):
            # Determine the row and column index for the current square
            row = 7 - (square // 8)  # Invert row index to match array indexing
            col = square % 8
            # Set the value in the numpy array to 1
            array[row, col] = 1
    return array

def board_to_obs(board):
    obs = np.zeros((8, 8, 12), dtype=int)
    pieces = [Piece.PAWN, Piece.KNIGHT, Piece.BISHOP, Piece.ROOK, Piece.QUEEN, Piece.KING]
    colors = [Color.WHITE, Color.BLACK]
    # Iterate through each colour
    for color in colors:
        color_index = 0
        if color == Color.BLACK:
            color_index = 6 
        # Iterate over all types of pieces
        for i, piece in enumerate(pieces):
            bitboard = board.get_board_color_piece(color, piece)
            array = bitboard_to_array(bitboard)
            bitboard_index = i + color_index
            obs[:, :, bitboard_index] = array
    return obs

In [7]:
#env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)

In [7]:
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False

    # play one episode
    while not done:
        # Convert the board "obs" to an 8 x 8 x 12 array i.e. all the bitboards..
        #obs = board_to_obs(obs)
        obs = obs.get_hash()
        action = agent.get_action(obs)
        take_action = env.legal_moves[action]
        next_obs, reward, terminated, truncated, info = env.step(take_action)
    
        # update the agent
        agent.update(obs, action, reward, terminated, next_obs.get_hash())
        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs
    agent.decay_epsilon()

  self.board.make_move(np.random.choice(self.legal_moves, 1))
 25%|██▌       | 25/100 [10:08<30:24, 24.33s/it]


KeyboardInterrupt: 

In [None]:
"""
b = Board()
agent.epsilon = 0

for i in range(10):
    # 10 moves 5 by white, 5 by black
    obs = b.get_hash()
    g = Generator()
    g.generate_legal_moves(b)
    best = 0

    if i % 2 == 0: # white to move
        best = agent.get_action(obs)

    else: # black (random) agent
        best = np.random.randint(0, g.get_n_legal_moves())

    b.make_move(g.get_legal_moves()[best])
    print_board(b)
"""

'\nb = Board()\nagent.epsilon = 0\n\nfor i in range(10):\n    # 10 moves 5 by white, 5 by black\n    obs = b.get_hash()\n    g = Generator()\n    g.generate_legal_moves(b)\n    best = 0\n\n    if i % 2 == 0: # white to move\n        best = agent.get_action(obs)\n\n    else: # black (random) agent\n        best = np.random.randint(0, g.get_n_legal_moves())\n\n    b.make_move(g.get_legal_moves()[best])\n    print_board(b)\n'