In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from stockfish import Stockfish
import chess

In [2]:
stockfish = Stockfish("./stockfish/stockfish_14_win_x64_avx2/stockfish_14_x64_avx2.exe", parameters={"Skill Level": 1})

In [3]:
gamma = 0.99  # Discount factor for past rewards
epsilon = 1  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1  # Maximum epsilon greedy parameter
epsilon_interval = (
    epsilon_max - epsilon_min
)  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 10000


num_actions = 4096

In [4]:
move_indices = []

for from_square in range(64):
    for to_square in range(64):
        move_indices.append(chess.Move(from_square, to_square))

piece_dict = {
    'p' : [1,0,0,0,0,0,0,0,0,0,0,0],
    'P' : [0,0,0,0,0,0,1,0,0,0,0,0],
    'n' : [0,1,0,0,0,0,0,0,0,0,0,0],
    'N' : [0,0,0,0,0,0,0,1,0,0,0,0],
    'b' : [0,0,1,0,0,0,0,0,0,0,0,0],
    'B' : [0,0,0,0,0,0,0,0,1,0,0,0],
    'r' : [0,0,0,1,0,0,0,0,0,0,0,0],
    'R' : [0,0,0,0,0,0,0,0,0,1,0,0],
    'q' : [0,0,0,0,1,0,0,0,0,0,0,0],
    'Q' : [0,0,0,0,0,0,0,0,0,0,1,0],
    'k' : [0,0,0,0,0,1,0,0,0,0,0,0],
    'K' : [0,0,0,0,0,0,0,0,0,0,0,1],
    '.' : [0,0,0,0,0,0,0,0,0,0,0,0],
}

piece_value_dict = {
    'p' : -1,
    'P' : 1,
    'n' : -3,
    'N' : 3,
    'b' : -3,
    'B' : 3,
    'r' : -5,
    'R' : 5,
    'q' : -9,
    'Q' : 9
}

In [5]:
def board_to_matrix(board):
    lines = str(board).split("\n")
    matrix = []

    for line in lines:
        split_line = line.split(' ')
        matrix_line = list(map(lambda x: piece_dict[x], split_line))
        matrix.append(matrix_line)
    
    return matrix

def filter_legal_moves(move_probs, chess_board):
    filter_mask = np.zeros(move_probs.shape)
    
    legal_moves = chess_board.legal_moves
    for move in legal_moves:
        filter_mask[0][64*move.from_square + move.to_square] = 1

    return move_probs*filter_mask

def count_material(chess_board):
    board_string = str(chess_board)
    material_value = 0

    for k,v in piece_value_dict.items():
        material_value += board_string.count(k) * v

    return material_value



        

In [6]:

def create_q_model():
    # Network defined by the Deepmind paper
    inputs = layers.Input(shape=(8, 8, 12))
    initializer = tf.keras.initializers.HeNormal()

    # Convolutions on the frames on the screen
    # layer1 = layers.Conv2D(32, 2, strides=(2, 2), activation="relu")(inputs)
    # layer2 = layers.Conv2D(64, 2, strides=(2, 2), activation="relu")(layer1)
    # layer3 = layers.Conv2D(64, 2, strides=(2, 2), activation="relu")(layer2)

    
    layer1 = layers.Dense(64, activation="relu", kernel_initializer=initializer)(inputs)
    layer2 = layers.Dense(128, activation="relu", kernel_initializer=initializer)(layer1)
    layer3 = layers.Flatten()(layer2)

    action = layers.Dense(num_actions, activation="linear")(layer3)

    return keras.Model(inputs=inputs, outputs=action)     

In [7]:

model = create_q_model()
model_target = create_q_model()

In [8]:

optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

In [9]:
# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
move_count = 0
# Number of frames to take random action and observe output
# epsilon_random_frames = 500
# Number of frames for exploration
epsilon_greedy_moves = 10000
# Maximum replay length
# Note: The Deepmind paper suggests 1000000 however this causes memory issues
max_memory_length = 1000
# Train the model after 4 actions
update_after_moves = 4
# How often to update the target network
update_target_network = 100
# Using huber loss for stability
loss_function = keras.losses.Huber()

living_reward = 0.001
material_weight = 1
checkmate_weight = 100
num_games = 1000

game_counter = 0
win_counter = 0
loss_counter = 0


play_as_white = False



In [10]:
while game_counter < num_games:  # Run games

    game_counter += 1

    # init stockfish
    stockfish = Stockfish("./stockfish/stockfish_14_win_x64_avx2/stockfish_14_x64_avx2.exe")

    # init board
    fish_board = stockfish.get_fen_position()
    chess_board = chess.Board(fish_board)

    episode_reward = 0
    material_value = 0
    play_as_white = not play_as_white

    stockfish_to_move = play_as_white

    done = False

    episode_move_counter = -1

    while True: # play

        # print(stockfish.get_board_visual())
        # print(chess_board)

        stockfish_to_move = not stockfish_to_move

        episode_move_counter += 1

        if stockfish_to_move:
            try:

                best_move = stockfish.get_best_move()
                chess_board.push(chess.Move.from_uci(best_move))
                stockfish.make_moves_from_current_position([best_move])

            except:
                break
                

                    
            if chess_board.is_game_over():
                if chess_board.outcome().termination == chess.Termination.CHECKMATE:
                    loss_counter += 1
                done_history[-1] = True
                break

            continue
        


        # transform board state to matrix
        board_matrix = np.array(board_to_matrix(chess_board))

        move = None
        move_num = 0

        # Use epsilon-greedy for exploration
        random_number = np.random.rand(1)[0]
        if epsilon > random_number:
        # if False:
            # Take random action
            move = np.random.choice(list(chess_board.legal_moves))

            from_square = move.from_square
            to_square = move.to_square

            move_num = 64*from_square + to_square
        else:


            # Predict action Q-values
            # From environment state
            state_tensor = tf.convert_to_tensor(board_matrix)
            state_tensor = tf.expand_dims(state_tensor, 0)
            move_probs = model(state_tensor, training=False)
            # Take best action
            move_probs = filter_legal_moves(move_probs, chess_board)
            move_num = tf.argmax(move_probs[0]).numpy()

        # Decay probability of taking random action
        epsilon -= epsilon_interval / epsilon_greedy_moves
        epsilon = max(epsilon, epsilon_min)

        # Push old state to hisotory
        state_history.append(board_matrix)

        # transform move_num to move, add promotion if possible
        if not move:
            promotion = None
            from_square = move_num // 64
            to_square = move_num % 64
            if from_square > 47 and from_square < 56 and chess_board.piece_at(from_square) == 'P':
                promotion = 5
            if from_square > 7 and from_square < 16 and chess_board.piece_at(from_square) == 'p':
                promotion = 5
            move = chess.Move(from_square, to_square, promotion)

        # Apply the sampled move in our environment
        chess_board.push(move)
        stockfish.make_moves_from_current_position([str(move)])
        move_count += 1

        # Calcualte reward
        old_material_value = material_value
        material_value = count_material(chess_board)
        reward = (material_value - old_material_value) * (1 if play_as_white else -1) * material_weight

        if chess_board.is_checkmate():
            win_counter += 1
            reward += checkmate_weight


        if chess_board.is_game_over():
            done = True

        reward += living_reward


        # Update history
        action_history.append(move_num)
        next_board_matrix = np.array(board_to_matrix(chess_board))
        state_next_history.append(next_board_matrix)
        done_history.append(done)
        rewards_history.append(reward)

        episode_reward += reward



        # Update every fourth move and once batch size is over 32
        if move_count % update_after_moves == 0 and len(done_history) > batch_size:

            # Get indices of samples for replay buffers
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # Using list comprehension to sample from replay buffer
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = tf.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # Build the updated Q-values for the sampled future states
            # Use the target model for stability
            future_rewards = model_target.predict(state_next_sample)
            # Q value = reward + discount factor * expected future reward
            updated_q_values = rewards_sample + gamma * tf.reduce_max(
                future_rewards, axis=1
            )

            # If final frame set the last value to -1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # Create a mask so we only calculate loss on the updated Q-values
            masks = tf.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # Train the model on the states and updated Q-values
                q_values = model(state_sample)

                # Apply the masks to the Q-values to get the Q-value for action taken
                q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                # Calculate loss between new Q-value and old Q-value
                loss = loss_function(updated_q_values, q_action)

            # Backpropagation
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if move_count % update_target_network == 0:
            # update the target network with new weights
            model_target.set_weights(model.get_weights())
            # Log details
            template = "running reward: {:.2f} at episode {}, move count {}"
            print(template.format(running_reward, episode_count, move_count))

        # Limit the state and reward history
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if chess_board.is_game_over():
            break

    # Update running reward to check condition for solving
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    print(f"Game {episode_count} ended, wins: {win_counter}, loses: {loss_counter}, draws: {game_counter - win_counter - loss_counter}.")


Game 1 ended, wins: 0, loses: 1, draws: 0.
Game 2 ended, wins: 0, loses: 2, draws: 0.
Game 3 ended, wins: 0, loses: 3, draws: 0.
Game 4 ended, wins: 0, loses: 4, draws: 0.
Game 5 ended, wins: 0, loses: 5, draws: 0.
Game 6 ended, wins: 0, loses: 6, draws: 0.
running reward: -12.32 at episode 6, move count 100
Game 7 ended, wins: 0, loses: 7, draws: 0.
Game 8 ended, wins: 0, loses: 8, draws: 0.
Game 9 ended, wins: 0, loses: 9, draws: 0.
Game 10 ended, wins: 0, loses: 10, draws: 0.
Game 11 ended, wins: 0, loses: 11, draws: 0.
Game 12 ended, wins: 0, loses: 12, draws: 0.
running reward: -16.73 at episode 12, move count 200
Game 13 ended, wins: 0, loses: 13, draws: 0.
Game 14 ended, wins: 0, loses: 14, draws: 0.
Game 15 ended, wins: 0, loses: 15, draws: 0.
Game 16 ended, wins: 0, loses: 16, draws: 0.
Game 17 ended, wins: 0, loses: 16, draws: 1.
running reward: -18.34 at episode 17, move count 300
Game 18 ended, wins: 0, loses: 17, draws: 1.
Game 19 ended, wins: 0, loses: 18, draws: 1.
Game 

In [None]:
model.save('models/v4')


INFO:tensorflow:Assets written to: models/v3\assets


In [None]:

model.save_weights('./model_weights/v4')