# Solving Chess Using Deep Q-Learning

In [1]:
import gc

from math import exp
import numpy as np
from random import random, choice

from gym import Env
from gym.spaces import Discrete, Tuple

import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

from rl.agents import DQNAgent
from rl.policy import EpsGreedyQPolicy, GreedyQPolicy
from rl.memory import SequentialMemory

import chess
import chess.pgn
import chess.svg
from datetime import datetime

import warnings
warnings.filterwarnings('ignore', category = UserWarning)

## 1. Defining state and action spaces

In [2]:
def chess_positions():
    return [f"{char}{num + 1}" for num in range(8) for char in "abcdefgh"]

def knight_moves():
    positions = chess_positions()
    moves = []
    for position in positions:
        position_alpha_ord = ord(position[0])
        position_digit = int(position[1])
        moves.append(f"{position}{chr(position_alpha_ord + 2)}{min(max(position_digit + 1, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord + 2)}{min(max(position_digit - 1, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord - 2)}{min(max(position_digit + 1, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord - 2)}{min(max(position_digit - 1, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord + 1)}{min(max(position_digit + 2, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord + 1)}{min(max(position_digit - 2, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord - 1)}{min(max(position_digit + 2, 0), 9)}")
        moves.append(f"{position}{chr(position_alpha_ord - 1)}{min(max(position_digit - 2, 0), 9)}")

    return [move for move in moves if move[-2:] in positions]

def bishop_moves():
    def helper(position, positions, x, y):
        position_alpha_ord = ord(position[0])
        position_digit = int(position[1])
        next_pos = f"{chr(position_alpha_ord + x)}{min(max(position_digit + y, 0), 9)}"
        if next_pos in positions:
            return [next_pos] + helper(next_pos, positions, x, y)
        else:
            return []
        
    positions = chess_positions()
    moves = []
    for position in positions:
        moves += [position + next_pos for next_pos in helper(position, positions, 1, 1)]
        moves += [position + next_pos for next_pos in helper(position, positions, 1, -1)]
        moves += [position + next_pos for next_pos in helper(position, positions, -1, 1)]
        moves += [position + next_pos for next_pos in helper(position, positions, -1, -1)]
    return moves

def rook_moves():
    def helper(position, positions, x, y):
        position_alpha_ord = ord(position[0])
        position_digit = int(position[1])
        next_pos = f"{chr(position_alpha_ord + x)}{min(max(position_digit + y, 0), 9)}"
        if next_pos in positions:
            return [next_pos] + helper(next_pos, positions, x, y)
        else:
            return []
    positions = chess_positions()
    moves = []
    for position in positions:
        moves += [position + next_pos for next_pos in helper(position, positions, 1, 0)]
        moves += [position + next_pos for next_pos in helper(position, positions, -1, 0)]
        moves += [position + next_pos for next_pos in helper(position, positions, 0, 1)]
        moves += [position + next_pos for next_pos in helper(position, positions, 0, -1)]
    return moves

def queen_moves():
    return rook_moves() + bishop_moves()

def pawn_promotion_moves():
    def helper(position, positions, x, y):
        position_alpha_ord = ord(position[0])
        position_digit = int(position[1])
        next_pos = f"{chr(position_alpha_ord + x)}{min(max(position_digit + y, 0), 9)}"
        if next_pos in positions:
            return [next_pos] + helper(next_pos, positions, x, y)
        else:
            return []
    positions = chess_positions()
    pawn_positions_white = [alpha + "7" for alpha in "abcdefgh"]
    pawn_positions_black = [alpha + "2" for alpha in "abcdefgh"]
    moves = []
    for position in pawn_positions_white:
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, 0, 1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, 0, 1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, 0, 1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, 0, 1)]
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, 1, 1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, 1, 1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, 1, 1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, 1, 1)]
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, -1, 1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, -1, 1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, -1, 1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, -1, 1)]
    for position in pawn_positions_black:
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, 0, -1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, 0, -1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, 0, -1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, 0, -1)]
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, 1, -1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, 1, -1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, 1, -1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, 1, -1)]
        moves += [position + next_pos + "q" for next_pos in helper(position, positions, -1, -1)]
        moves += [position + next_pos + "r" for next_pos in helper(position, positions, -1, -1)]
        moves += [position + next_pos + "b" for next_pos in helper(position, positions, -1, -1)]
        moves += [position + next_pos + "n" for next_pos in helper(position, positions, -1, -1)]
    
    return moves

def possible_moves():
    # Naive way of determining action space, weeds out invalid actions
    return knight_moves() + queen_moves() + pawn_promotion_moves()

def valid_moves(board):
    return tuple(move.uci() for move in board.legal_moves)

def board_to_state(board):
    """
    FEN (https://en.wikipedia.org/wiki/Forsyth%E2%80%93Edwards_Notation)
    * White pieces are uppercase
    * Black pieces are lowercase

    Consists of 6 fields:
        1. Piece placement (from white's perspective)
        2. Active colour (w: white to move, b: black to move)
        3. Castling availability (-: no castling, KQkq: both can castle, KQ: only white can castle,
                                    kq: only black can castle, Q: only white can castle queenside,
                                    k: only black can castle kingside)
        4. En passant availability (-: no en passant, f6: en passant capture available on f6)
        5. Half-move clock (used for 50 move rule, if reach 50 it is a draw)
        6. Full-move clock (number of moves made by black since start of game)
    """
    fields = board.fen().split(" ")

    # PIECE PLACEMENT
    # Preferable to use one-hot encoding, did not use it because of high space complexity
    # Categorical encoding is used instead, labelled by rewards assigned to each piece
    # One-hot encoding used to separate black and white pieces
    # Nothing: 0
    # Pawn: 1
    # Knight: 3
    # Bishop: 3.15
    # Rook: 4.5
    # Queen: 9
    # King: 1000
    REWARDS = {"P": 1, # pawn
               "N": 3, # knight
               "B": 3.15, # bishop
               "R": 4.5, # rook
               "Q": 9, # queen
               "K": 1000} # king
    fields[0] = fields[0].split("/")[::-1]
    labels = chess_positions()
    pieces = [[0, 0] for _ in range(64)] # list of list[white, black]
    column = 0
    for row in range(len(fields[0])):
        for piece in fields[0][row]:
            if piece.isdigit(): # nothing
                column += int(piece)
            else: # piece
                if piece.isupper(): # white
                    pieces[column][0] = REWARDS[piece.upper()]
                else: # black
                    pieces[column][1] = REWARDS[piece.upper()]
                column += 1
    piece_placements = dict(zip(labels, pieces))

    # ACTIVE COLOUR
    active_colour = [1] if fields[1] == "w" else [0]

    # CASTLING AVAILABILITY
    castling = [1 if "K" in fields[2] else 0,
                1 if "Q" in fields[2] else 0,
                1 if "k" in fields[2] else 0,
                1 if "q" in fields[2] else 0] # [kingside_white, queenside_white, kingside_black, queenside_black]

    # EN PASSANT AVAILABILITY
    # Categorical encoding, by alphabetical label of square (e.g. if square is d6, en_passant = 4)
    # En passant is always on 6th rank
    # No en passant, en_passant = 0
    en_passant = [ord(fields[3][0]) - ord("a") + 1] if fields[3] != "-" else [0]
    
    ##return [[piece for square in piece_placements.values() for piece in square], active_colour, castling, en_passant]
    return [piece for square in piece_placements.values() for piece in square] + active_colour + castling + en_passant

def softmax(lst):
    exp_list = [exp(item) for item in lst]
    return [item / sum(exp_list) for item in exp_list]

def calculate_score(board, white_turn = True):
    score = round(sum(board_to_state(board)[:-6][::2]) - sum(board_to_state(board)[:-6][1::2]), 2)
    if board.is_checkmate():
        if white_turn:
            score += 1000
        else:
            score -= 1000
    return score if white_turn == True else -score

def softmax_opponent_reward(board, white_turn = True, done = False):
    # Calculate softmax reward for black if white_turn = True, and vice-versa
    if done == True:
        return 0.0
    rewards = []
    next_moves = valid_moves(board)
    curr_score = calculate_score(board, white_turn ^ True)
    for move in next_moves:
        board.push(chess.Move.from_uci(move))
        rewards.append(calculate_score(board, white_turn ^ True) - curr_score)
        board.pop()
    if max(rewards) > 100: # exp(1000) leads to overflow
        return max(rewards)
    softmax_rewards = softmax(rewards)
    return sum([rewards[index] * softmax_rewards[index] for index in range(len(rewards))])

## 2. Creating custom gym environment

In [3]:
class ChessEnv(Env):
    def __init__(self, white_player = "Magnum Carlos", black_player = "Magnesium Cars", episode = 0):
        # WORK TO DO: implement param "board" and "white_turn" for __init__() to submit game that is still ongoing
        self.board = chess.Board()
        self.game = chess.pgn.Game()
        self.game.headers["Event"] = "Solving Chess Using Deep Q-Learning"
        self.game.headers["Site"] = "Python"
        self.game.headers["Date"] = datetime.now().date()
        self.game.headers["Round"] = episode
        self.game.headers["White"] = white_player
        self.game.headers["Black"] = black_player
        self.move = self.game
        self.actions = possible_moves()
        self.action_space = Discrete(len(self.actions) + 1) # stalemate/checkmate
        # 7 possibilities for each of the 64 squares (x2 for white and black)
        # Binary for active colour and castling availability
        # 9 possibilities for en passant capture on 6th rank
        # Multidiscrete/Box/Tuple space does not work for keras-rl2: https://stackoverflow.com/questions/70861260/training-dqn-agent-with-multidiscrete-action-space-in-gym
        ##self.observation_space = Tuple([Discrete(7) for _ in range(128)] + [Discrete(2) for _ in range(5)] + [Discrete(9)])
        ##self.observation_space = MultiDiscrete([7 for _ in range(128)] + [2 for _ in range(5)] + [9])
        self.observation_space = Tuple([Discrete(9) for _ in range(len(board_to_state(self.board)))]) # there are some unused observation space, but we should compromise
        self.white_turn = True
        self.episode_length = 400
        
    def board_to_state(self): # custom board to state implemented to flatten observation space
        PIECES = {0: 0, # nothing
                  1: 1, # pawn
                  3: 2, # knight
                  3.15: 3, # bishop
                  4.5: 5, # rook
                  9: 6, # queen
                  1000: 7} # king
        state = board_to_state(self.board)
        state = [PIECES[piece] for piece in state[:-6]] + state[-6:]
        return [[1 if index == item else 0 for index in range(9)] for item in state]
        
    def step(self, action):
        if action == len(self.actions): # stalemate/checkmate
            self.episode_length -= 0.5
            done = self.check_if_done()
            self_reward = 0 - calculate_score(self.board, self.white_turn)
            return tuple(self.board_to_state()), self_reward, done, {} # observation, reward, done, info
        
        action = self.actions[action]
        curr_score = calculate_score(self.board, self.white_turn)
        
        self.board.push(chess.Move.from_uci(action)) # next state
        self.move = self.move.add_variation(chess.Move.from_uci(action))
            
        self_reward = calculate_score(self.board, self.white_turn) - curr_score
        
        self.episode_length -= 0.5 # half move
        done = self.check_if_done()
        
        self_reward -= softmax_opponent_reward(self.board, self.white_turn, done) # take opponent move into consideration
        
        self.white_turn ^= True # pass back turn to opponent
        
        return tuple(self.board_to_state()), self_reward, done, {} # observation, reward, done, info
    
    def render(self, mode = "human"):
        return chess.svg.board(self.board, size = 350)  
    
    def reset(self ,get_last_game = False):
        white_player, black_player, episode = self.game.headers["White"], self.game.headers["Black"], self.game.headers["Round"]
        
        if episode % 5 == 0 and episode > 0:
            if self.game.headers["Result"] != "*":
                print(self.game, file = open(f"games/game_{episode}.pgn", "w"), end = "\n\n")
        
        del self.game
        del self.board
        del self.action_space
        del self.observation_space
        gc.collect()
        if get_last_game:
            episode -= 1
        self.__init__(white_player, black_player, episode + 1)
        return tuple(self.board_to_state())
    
    def check_if_done(self):
        if self.board.is_checkmate():
            self.game.headers["Result"] = "1-0" if self.white_turn is True else "0-1"
            return True
        elif self.board.can_claim_draw() or self.board.is_insufficient_material() or self.board.is_stalemate() or self.episode_length <= 0:
            self.game.headers["Result"] = "1/2-1/2"
            return True
        else:
            return False
        
class CustomChessEnv(ChessEnv):
    def __init__(self, white_player = "Magnum Carlos", black_player = "Magnesium Cars", episode = 0):
        super().__init__(white_player, black_player, episode)
        if episode % 5 == 1: # common openings
            action = "e2e4"
        elif episode % 5 == 2:
            action = "d2d4"
        elif episode % 5 == 3:
            action = "c2c4"
        else:
            allowed_actions = list(valid_moves(self.board))
            allowed_actions.remove("e2e4")
            allowed_actions.remove("d2d4")
            allowed_actions.remove("c2c4")
            action = choice(allowed_actions)
        
        self.board.push(chess.Move.from_uci(action))
        self.move = self.move.add_variation(chess.Move.from_uci(action))
    
    def reset(self, get_last_game = False):
        white_player, black_player, episode = self.game.headers["White"], self.game.headers["Black"], self.game.headers["Round"]
        
        if self.game.headers["Result"] != "*":
            print(self.game, file = open(f"games/game_{episode}.pgn", "w"), end = "\n\n")
        
        del self.game
        del self.board
        del self.action_space
        del self.observation_space
        gc.collect()
        
        if get_last_game:
            episode -= 1
        self.__init__(white_player, black_player, episode + 1)
        return tuple(self.board_to_state())

In [4]:
env = ChessEnv()
observation_space_size = (1, len(env.observation_space), 9)
action_space_size = env.action_space.n

## 3. Creating deep RL model

In [5]:
def create_model(observation_space_size, action_space_size, hidden_layers = 3, nodes = 32):
    inputs = Input(shape = observation_space_size)
    hidden = []
    for _ in range(hidden_layers):
        if len(hidden) == 0:
            hidden.append(Dense(nodes, activation = "relu", kernel_initializer = "he_normal")(inputs))
        else:
            hidden.append(Dense(nodes, activation = "relu", kernel_initializer = "he_normal")(hidden[-1]))
        nodes //= 2 # Decreasing number of nodes increases efficiency
    hidden.append(Flatten()(hidden[-1]))
    outputs = Dense(action_space_size, activation = "linear")(hidden[-1])
    return Model(inputs = inputs, outputs = outputs)

model = create_model(observation_space_size, action_space_size)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 1, 134, 9)]       0         
                                                                 
 dense (Dense)               (None, 1, 134, 32)        320       
                                                                 
 dense_1 (Dense)             (None, 1, 134, 16)        528       
                                                                 
 dense_2 (Dense)             (None, 1, 134, 8)         136       
                                                                 
 flatten (Flatten)           (None, 1072)              0         
                                                                 
 dense_3 (Dense)             (None, 1969)              2112737   
                                                                 
Total params: 2,113,721
Trainable params: 2,113,721
Non-train

## 4. Creating agent

In [6]:
# Constants
DISCOUNT_FACTOR = 0.99
LEARNING_RATE = 0.05
EXPLORATION_RATE = 0.9999
EXPLORATION_DECAY = 0.999999

class EpsilonGreedyPolicy(EpsGreedyQPolicy): # custom policy
    def __init__(self, eps, env):
        super().__init__(eps = eps)
        self.env = env
    
    def select_action(self, q_values):
        assert q_values.ndim == 1
        
        # Invalid action masking
        allowed_actions = valid_moves(self.env.board)
        action_space = self.env.actions
        allowed_q_values = {action_space[index]: q_values[index] for index in range(len(action_space)) if action_space[index] in allowed_actions}

        if len(allowed_actions) == 0:
            return len(action_space) # stalemate/checkmate
        
        if random() < self.eps: # explore, random action
            action = choice(allowed_actions)
        else: # exploit, best action
            action = max(allowed_q_values, key = allowed_q_values.get)
        action = action_space.index(action)
        
        self.eps *= EXPLORATION_DECAY
        return action
    
class GreedyPolicy(GreedyQPolicy):
    def __init__(self, env):
        super().__init__()
        self.env = env
    
    def select_action(self, q_values):
        assert q_values.ndim == 1
        
        # Invalid action masking
        allowed_actions = valid_moves(self.env.board)
        action_space = self.env.actions
        allowed_q_values = {action_space[index]: q_values[index] for index in range(len(action_space)) if action_space[index] in allowed_actions}
        
        if len(allowed_actions) == 0:
            return len(action_space) # stalemate/checkmate
        
        action = max(allowed_q_values, key = allowed_q_values.get)
        action = action_space.index(action)
        return action

def create_agent(model, action_space_size, env):
    policy = EpsilonGreedyPolicy(EXPLORATION_RATE, env)
    test_policy = GreedyPolicy(env)
    memory = SequentialMemory(limit = 100000, window_length = 1) # may want to separate memory into black and white, early and mid and endgame
    # Regarding target_model_update: https://github.com/keras-rl/keras-rl/issues/55
    dqn = DQNAgent(model = model, memory = memory, policy = policy, test_policy = test_policy, nb_actions = action_space_size, nb_steps_warmup = 100, target_model_update = 1e-2, gamma = DISCOUNT_FACTOR)
    return dqn

In [7]:
dqn = create_agent(model, action_space_size, env)
optimizer = Adam(learning_rate = LEARNING_RATE)
dqn.compile(optimizer)
dqn.fit(env, nb_steps = 400000, visualize = False, verbose = 1)
_ = env.reset(get_last_game = True)

Training for 400000 steps ...
Interval 1 (0 steps performed)
28 episodes - episode_reward: -4285.523 [-10795.561, -259.878] - loss: 157979543131341824.000 - mean_q: 3303853289.479

Interval 2 (10000 steps performed)
29 episodes - episode_reward: -4957.612 [-16589.937, -393.690] - loss: 190134992169722707968.000 - mean_q: 236332646400.000

Interval 3 (20000 steps performed)
28 episodes - episode_reward: -5377.110 [-17437.958, -361.314]

Interval 4 (30000 steps performed)
29 episodes - episode_reward: -5310.128 [-17845.523, -356.388]

Interval 5 (40000 steps performed)
30 episodes - episode_reward: -6120.297 [-29434.273, -296.457]

Interval 6 (50000 steps performed)
34 episodes - episode_reward: -4461.119 [-21050.364, -345.225]

Interval 7 (60000 steps performed)
30 episodes - episode_reward: -4714.768 [-19728.640, -250.528]

Interval 8 (70000 steps performed)
32 episodes - episode_reward: -3595.117 [-23442.966, -67.488]

Interval 9 (80000 steps performed)
31 episodes - episode_reward: -

In [8]:
dqn.test(env, nb_episodes = 5, visualize = False)
_ = env.reset(get_last_game = True)

Testing for 5 episodes ...
Episode 1: reward: -22.064, steps: 26
Episode 2: reward: -22.064, steps: 26
Episode 3: reward: -22.064, steps: 26
Episode 4: reward: -22.064, steps: 26
Episode 5: reward: -22.064, steps: 26


## 5. Saving and loading the DQN model

In [9]:
dqn.save_weights('dqn_weights.h5f', overwrite = True)

In [10]:
env = CustomChessEnv(env.game.headers["White"], env.game.headers["Black"], env.game.headers["Round"])
observation_space_size = (1, len(env.observation_space), 9)
action_space_size = env.action_space.n
model = create_model(observation_space_size, action_space_size)
dqn = create_agent(model, action_space_size, env)
optimizer = Adam(learning_rate = LEARNING_RATE)
dqn.compile(optimizer)
dqn.load_weights('dqn_weights.h5f')

In [11]:
dqn.test(env, nb_episodes = 5, visualize = True)
_ = env.reset(get_last_game = True)

Testing for 5 episodes ...
Episode 1: reward: -5.277, steps: 29
Episode 2: reward: -0.753, steps: 28
Episode 3: reward: -2.949, steps: 23
Episode 4: reward: -0.803, steps: 28
Episode 5: reward: 0.000, steps: 15


## 6. Results
Though I did not tune the various hyperparameters, what was certain was that deep q-learning was especially bad at solving chess, in which the main reason lies in stability. Without a stable opponent, it is unable to determine whether a move was good or bad. Furthermore, with the analysis of a particular game my model has played (https://www.chess.com/analysis/game/pgn/h1TkS1t1U), we can see that the agent is unable to take advantage of the opponent's blunders and kept on "sacrificing" pieces for no material or positional advantage. Moreover, it is unable to play endgames well, though it might if we were to create 3 separate agents to play openings, middlegame and endgame separately. The agent is also performing quite badly as seen from the agent drawing winnable games through repetition or going through 50 moves without a capture.

Based on the large loss and mean_q values produced by the model, it could be due to the exploding gradients problem. We can solve this using through clipping (passing a clipvalue/clipnorm argument to our optimizer), but I believe this problem is second to instability. The problem of stability can still be solved if the model was playing against a stable agent like Stockfish.

Though, if there is anything that my agent learnt how to do, it was to make quick draws like super GMs do. Also it knows how to develop the knights first.