# Reinforcement Learning - Deep Q-Learning for Racing Kings

In [50]:
!pip install numpy
!pip install tensorflow==2.3.0
!pip install keras
!pip install keras-rl2
!pip install chess



In [51]:
import numpy as np
import tensorflow as tf
import datetime
from statistics import mean
from racing_kings_env_armin import RacingKingsEnvironment

## Initial Test of the Evironment
To ensure proper executability of the environment a few episodes of gameplay are executed.

In [52]:
episodes = 10
env = RacingKingsEnvironment()
for episode in range(1, episodes+1):
    env.reset()
    done = False
    sum_of_rewards = 0

    while not done:
        env.render(mode=None)
        action = env.action_space.sample()
        observation, reward, done, information = env.step(action)
        sum_of_rewards+=reward
    print('Episode:{} Sum of Rewards:{} Information:{}'.format(episode, sum_of_rewards, information))
env.close()

Episode:1 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:2 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:3 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:4 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:5 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:6 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:7 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:8 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:9 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
Episode:10 Sum of Rewards:0 Information:{'msg': 'Action is not a valid move'}
closing


## Building the Convolutional Neural Network (CNN)
For better compatibility with the task of Racing Kings a Custom CNN is created using Keras.
The kernel Size (convolution window size), is chosen according to the publication that was given with the task.

In [53]:
class CustomModel(tf.keras.Model):
    # Init function for the Custom CNN Model
    def __init__(   self, 
                    shape_states, 
                    shape_actions ):
        # Call the inherited init function
        super(CustomModel, self).__init__()
        # define input layer
        self.input_layer = tf.keras.layers.InputLayer(input_shape=shape_states)
        # create array for hidden layers
        self.hidden_layers = []
        # init the hidden convolutional layers
        for layer_template in [256, 256]:
            self.hidden_layers.append( 
                tf.keras.layers.Conv2D( layer_template, 
                                        kernel_size=(3,3), 
                                        activation='relu', 
                                        kernel_initializer='RandomNormal'))
        # flatten layer for compatibility with Dense output of shape_actions size
        self.flatten_layer = tf.keras.layers.Flatten()
        # output layer
        self.output_layer = tf.keras.layers.Dense(
                shape_actions, activation='linear', kernel_initializer='RandomNormal')
    # tensorflow compatible function used to assemble the model, outputting the model
    @tf.function
    def call(self, inputs):
        z = self.input_layer(inputs)
        for layer in self.hidden_layers:
            z = layer(z)
        flatten = self.flatten_layer(z)
        output = self.output_layer(flatten)
        return output

## DQN class
For easier creation of Target and Training model a custom class is created. It stores the hyperparameter gamma which represents the weight of future rewards.

In [54]:
class DeepQNetwork:
    def __init__(   self, 
                    shape_states, 
                    shape_actions, 
                    gamma, 
                    max_number_experiences, 
                    min_number_experiences, 
                    batch_size, 
                    lr ):
        self.shape_actions = shape_actions
        self.shape_states = shape_states
        self.batch_size = batch_size
        self.optimizer = tf.optimizers.Adam(lr)
        self.gamma = gamma
        self.model = CustomModel(shape_states, shape_actions)
        self.experience = { 'states': [], 
                            'actions': [], 
                            'rewards': [], 
                            'states_next': [], 
                            'done': []}
        self.max_number_experiences = max_number_experiences
        self.min_number_experiences = min_number_experiences

    def predict(self, inputs):
        if inputs.shape == self.shape_states:
            inputs = np.expand_dims(inputs, axis = 0)
        prediction = self.model(inputs.astype('float32'))
        return prediction

    def train(self, TargetNet):
        if len(self.experience['states']) < self.min_number_experiences:
            return 0
        # randomly chooses an integer in range 0 to num experiences states, with batchsize samples
        ids = np.random.randint(low=0, high=len(self.experience['states']), size=self.batch_size)
        # next copy batch size experiences to according array
        states = np.asarray([ self.experience['states'][i] for i in ids ])
        actions = np.asarray([ self.experience['actions'][i] for i in ids ])
        rewards = np.asarray([ self.experience['rewards'][i] for i in ids ])
        states_next = np.asarray([ self.experience['states_next'][i] for i in ids ])
        dones = np.asarray([self.experience['done'][i] for i in ids])
        
        # predicts the next values based on the next states
        value_next = np.max(TargetNet.predict(states_next), axis=1)
        
        # gets the reward where done is true 
        # and the reward + self.gamma * predicted_value) where done is false
        actual_values = np.where(dones, rewards, rewards+self.gamma*value_next)
        
        # select action avlues based on the predictions for the states and actions
        with tf.GradientTape() as tensorflow_gradient_tape:
            selected_action_values = tf.math.reduce_sum(
                self.predict(states) * tf.one_hot(actions, self.shape_actions), axis=1)
            # calculates the loss based on a reduced mean square
            loss = tf.math.reduce_mean(tf.square(actual_values - selected_action_values))
        variables = self.model.trainable_variables
        # then calculate and apply the gradients
        gradients = tensorflow_gradient_tape.gradient(loss, variables)
        self.optimizer.apply_gradients(zip(gradients, variables))
        return loss

    # does an prediction or pics a random action based on epsilon (exploration)
    def get_action(self, states, epsilon):
        # gets a random action if the random value between 0 and 1 is smaller exploration value epsilon
        if np.random.random() < epsilon:
            return np.random.choice(self.shape_actions)
        # do a normal prediction 
        else:
            return np.argmax(self.predict(states))

    # adds an experience to the buffer
    def add_experience(self, experience_to_add):
        # if the array is to long pop one elemnt for each 
        if len(self.experience['states']) >= self.max_number_experiences:
            for key in self.experience.keys():
                self.experience[key].pop(0)
        # append the items of the experience
        for key, value in experience_to_add.items():
            self.experience[key].append(value)

    # copies the trainable variables from one network to this network
    def copy_trainable_variables(self, network_to_copy_from):
        own_trainable_vars = self.model.trainable_variables
        other_trainable_vars = network_to_copy_from.model.trainable_variables
        for vars1, vars2 in zip(own_trainable_vars, other_trainable_vars):
            vars1.assign(vars2.numpy())

In [55]:
# gameplay for the training loop
def play_racing_kings(env, TrainNet, TargetNet, epsilon, copy_step):
    rewards = 0
    iter = 0
    done = False
    observations = env.reset()
    losses = list()
    while not done:
        action = TrainNet.get_action(observations, epsilon)
        prev_observations = observations
        observations, reward, done, _ = env.step(action)
        rewards += reward
        if done:
            env.reset()

        e = {'states': prev_observations, 'actions': action, 'rewards': reward, 'states_next': observations, 'done': done}
        TrainNet.add_experience(e)
        loss = TrainNet.train( TargetNet )
        if isinstance(loss, int):
            losses.append(loss)
        else:
            losses.append(loss.numpy())
        iter += 1
        if iter % copy_step == 0:
            TargetNet.copy_trainable_variables( TrainNet )
    return rewards, mean(losses), iter

In [56]:
# this training loop is explained in the documentation
env = RacingKingsEnvironment()
gamma = 0.99
copy_step = 5
max_number_of_experiences = 10000
min_number_of_experiences = 100
batch_size = 32
lr = 1e-2
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = 'Training/Logs/CustomDQN-' + current_time + '-min_epsilon-025-decay-1e-5_no-neg-reward'
summary_writer = tf.summary.create_file_writer(log_dir)

TrainNet = DeepQNetwork(    env.state_shape, 
                            env.action_shape, 
                            gamma, 
                            max_number_of_experiences, 
                            min_number_of_experiences, 
                            batch_size, 
                            lr)
TargetNet = DeepQNetwork(   env.state_shape, 
                            env.action_shape, 
                            gamma, 
                            max_number_of_experiences, 
                            min_number_of_experiences, 
                            batch_size, 
                            lr)

N = 10000
StorageInterval = 500
total_episode_rewards = np.empty(StorageInterval)
total_episode_lengths = np.empty(StorageInterval)
total_episode_losses = np.empty(StorageInterval)
# starting epsilon: at the beginning of training 99 % of randomness are allowed
epsilon = 0.99
# sets the speed epsilon decreases to min epsilon
decay = 1 - 1e-5
# sets the end amount of randomness encounterd by to model on long term training to 1 %
min_epsilon = 0.25
for n in range(N):
    epsilon = epsilon * decay
    if epsilon < min_epsilon:
        epsilon = min_epsilon
    
    reward, loss, step = play_racing_kings(env, TrainNet, TargetNet, epsilon, copy_step)
    total_episode_rewards[n%StorageInterval] = reward
    total_episode_lengths[n%StorageInterval] = step
    total_episode_losses[n%StorageInterval] = loss
    if n % StorageInterval == 0 and n != 0:
        avg_episode_rewards = np.mean(total_episode_rewards)
        avg_episode_length = np.mean(total_episode_lengths)
        avg_episode_losses = np.mean(total_episode_losses)
        with summary_writer.as_default():
            tf.summary.scalar('mean_ep_length', avg_episode_length, step=n)
            tf.summary.scalar('mean_reward', avg_episode_rewards, step=n)
            tf.summary.scalar('loss', avg_episode_losses, step=n)
            tf.summary.scalar('epsilon', epsilon, step=n)
        print('episode:{} epsilon:{:.3} mean_reward:{:.3} mean_ep_length:{:.3} loss:{:.3}'
                .format(n, float(epsilon), float(avg_episode_rewards), float(avg_episode_length), float(avg_episode_losses)))
        total_episode_rewards = np.empty(StorageInterval)
        total_episode_lengths = np.empty(StorageInterval)
        total_episode_losses = np.empty(StorageInterval)
env.close()

Play some games with the freshly trained Model. You can load other models with the commands below.

In [None]:
episodes = 10
for episode in range(1, episodes+1):
    # obs = env.reset()
    env.board.reset()
    obs, _, _, _ = env.step(None)
    done = False
    score = 0

    while not done:
        env.render(mode="human")
        action = TrainNet.get_action(obs, 0)
        print(">>{}".format(env.action_index_to_uci(action)))
        obs, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{} Info:{}'.format(episode, score, info))

## Save and Reload the Model Weights

If you want to try loading and executing a model, the 250k model with the relative path ***train_250k_net-025-1e-5_stp_rwd-0.h5f*** works quite well. And is also mentioned in the documentation.

In [None]:
TrainNet.model.save_weights('Training/SavedModels/train_10k_net-025-1e-5_stp_rwd-0.h5f', overwrite=True)
TargetNet.model.save_weights('Training/SavedModels/target_10k_net-025-1e-5_stp_rwd-0.h5f', overwrite=True)

Delete the current models

In [None]:
del TrainNet.model
del TargetNet.model

load the model from memory

In [None]:
TrainNet.model = CustomModel(TrainNet.shape_states, TrainNet.shape_actions)
TrainNet.model.load_weights('Training/SavedModels/train_250k_net-025-1e-5_stp_rwd-0.h5f')

# comment out these lines for 250k since only train net was saved
# TargetNet.model = CustomModel(TargetNet.shape_states, TargetNet.shape_actions)
# TargetNet.model.load_weights('Training/SavedModels/target_250k_net-025-1e-5_stp_rwd-0.h5f')

# Play Game Human vs AI

In [None]:
# Create environment from class RacingKings
env = RacingKingsEnvironment()
# Define necessary variables
obs = None
done = False
score = 0
# Reset chess board to ensure fresh start
env.board.reset()    
# Start game loop
while not done:
    #Activate render mode 
    env.render()
    # Check if turn is white or black (Human plays white)
    if env.who(env.board.turn) == 'White':
        # Print out formatted list of possible moves at each step as a reminder for human player
        print('Gueltige Züge:')
        legal_moves = list(env.board.legal_moves)
        legal_moves = map(lambda move: move.uci(), legal_moves)
        print(*legal_moves, sep = ", ")
        # Ask for human step via input of UCI move     
        action = env.action_uci_to_index(input ('Ihr nächster Zug:'))
        # Make step (Parameter 'True' to trigger game functionality in environment)
        obs, reward, done, info = env.step(action,True)
        score+=reward
        
    else:
        # Create AI step using predict function
        action = model.predict(obs)
        # Make step (position 0 because predict function returns tuple)
        obs, reward, done, info = env.step(action[0],True)
        score+=reward
# End game loop

# Render last state of chess board after game ends        
env.render()
# Print out game info
print('Score:{} Info:{}'.format(score, info))
# Closing environment
env.close()