### Reinforcement Learning code

This code contains the Qlearn routine which is used for training neural networks to play games using reinforcement learning.
The two games I've implemented with this are Connect 4 and Nought and Crosses which are found in the script GameCode.ipynb.
This script also contains routines for pitting two networks against eachother in evaluation games (i.e. after learning) as well as a routine for letting a human play against a network.

In [8]:
#Function for training two networks against eachother using deep Q-learning.
#Here p1 or p2 should be neural networks (i.e. an object from the Network class in the Neural Networks.ipynb).
#These could either be newly created networks or pre-existing networks that you want to re-train.
#We can also choose to enter the string "random" for either player to train the network against a random opponent.

#Our other parameters are:
#total_games: total number of games to be played during training; here the networks are updated once per move;
#eta: learning rate for networks;
#gamma: discounting factor (1 = no discounting, 0 = full discounting);
#evaluate: this is a flag that specifies whether to evaluate our networks against a random opponent at every 10% training interval;
#lmbda: L2 regularisation hyperparameter for weight regularisation of neural networks;

def Qlearn(total_games, eta, gamma, p1, p2, evaluate = False, save_stats = False, lmbda = 0):
    
    #First we check that the two networks want to play the same game.
    if p1 != "random" and p2 != "random":  
        if p1.game != p2.game: 
            print("Error: Opponents are trying to play different games")
            return
        else: game = p1.game
    else: 
        if p1 != "random": game = p1.game
        else: game = p2.game
    (p1_score, p2_score) = (0, 0) #Setting the initial scores to be 0-0.
    max_games = total_games  #Saving the number of games to be played overall (as total_games will be decremented later).
    standard_reward = 1   #Setting the standard reward/penalty given out during training after networks make winning/losing moves or invalid moves.
    
    #Preparing vectors for saving stats.
    if save_stats == True:
        running_scores = np.zeros((total_games,2))
        vs_games = []
        #We also each network's winrate against a random opponent before any training as a 'control test'.
        scores_vs_random = np.zeros((11, 2))
        (p1_wins, p1_losses, p1_draws) = NetworkVsNetwork(1000, p1, "random", False, False, game)
        (p2_losses, p2_wins, p2_draws) = NetworkVsNetwork(1000, "random", p2, False, False, game)
        scores_vs_random[0] = (p1_wins-p1_losses, p2_wins-p2_losses)
        i = 1
    
    #Here we loop over the total number of games to be played in our training session by playing a game to completion (with training) and decrementing total_games until it reaches 0 (at which point we stop training).
    while total_games > 0: 
        board, game_over, current_turn = reset_game(game)  #Resetting the turn number and board for each new game.
        (current_player, waiting_player) = (p1, p2)  #Setting p1 to always be the starting player.
        #Here we loop over an individual game, during which we get our networks to choose moves and we update the networks according to the observed rewards and values of their chosen moves.
        while game_over == False: #As long as the game hasn't been ended by the previous player then the new current player chooses their move.
            initial_state = board.flatten()  #Storing and reshaping the initial board state into a vectorised form which is presentable to the current network player.
            chosen_action, action_values = select_move(current_player, initial_state, eta, lmbda, game, standard_reward)  #Here we get our network to choose a move based on the current board.
            initial_scores = (p1_score, p2_score)  #Recording the scores before the next move is made.
            board, current_turn, p1_score, p2_score, game_over = input_move(chosen_action, False, game, board, current_turn, p1_score, p2_score)    #Inputs the chosen action and updates the board.
            
            #If the current player's move has ended the game then we need to update both player's most recent moves with an instantaneous reward/penalty respectively.
            if game_over == True: #Also note that since a player can't lose on their own turn, the game must have ended by the current player winning or by a draw.
                if (p1_score, p2_score) == initial_scores: reward = 0  #In the case of a draw (i.e. unchanged scores) we give both players a neutral 'reward' of 0.
                else: reward = standard_reward #Otherwise the current player must have won so we give them the standard reward (as specified above) and we penalise the waiting player by an equal amount.                    
                #Performing a single step of gradient descent on the current player (i.e. the winning player)
                if current_player != "random": 
                    #Adding an extra dimension to wp_inital_state so it can be fed-forward through our network.
                    initial_state = initial_state[:, np.newaxis]
                    #Here we use the experience to generate our target values for each action.
                    target_values = get_target_values(current_player, initial_state, chosen_action, reward, None, gamma, game, standard_reward)
                    #Here we can treat this single experience as a mini-batch of size 1 and perform gradient descent as usual.
                    #In our gradient descent step, initial_state takes the place of our 'input' and the target_values form our 'label'.
                    mb_means, mb_variances = current_player.mini_batch_gradient_descent([(initial_state, target_values)], eta, lmbda, 1) 
                #Performing a single step of gradient descent on the waiting player (i.e. the losing player)
                if waiting_player != "random": 
                    #Adding an extra dimension to wp_inital_state so it can be fed-forward through our network.
                    wp_initial_state = wp_initial_state[:, np.newaxis]
                    #Here we use the experience to generate our target values for each action.               
                    target_values = get_target_values(waiting_player, wp_initial_state, wp_chosen_action, -reward, None, gamma, game, standard_reward)
                    #Here we can treat this single experience as a mini-batch of size 1 and perform gradient descent as usual.
                    #In our gradient descent step, initial_state takes the place of our 'input' and the target_values form our 'label'.
                    mb_means, mb_variances = waiting_player.mini_batch_gradient_descent([(wp_initial_state, target_values)], eta, lmbda, 1) 
                    
            #If the game hasn't ended then the state created by the current player's move can now be used to update the values of the waiting player's chosen move (or specifically to update the waiting player's network).
            else:
                #We need this state since this is the next actionable state for the waiting player which is used to get the percieved value of this new state as required in the update step for the waiting player's network. 
                if current_turn != 2 and waiting_player != "random":    #We don't perform our network update if our network is the random player or if it's the second turn in the game since then the waiting player hasn't made any moves whose values need to be updated.
                    next_state = board.flatten()
                    #Adding an extra dimension to wp_inital_state so it can be fed-forward through our network.
                    wp_initial_state = wp_initial_state[:, np.newaxis]
                    #Here we use the experience to generate our target values for each action.
                    target_values = get_target_values(waiting_player, wp_initial_state, wp_chosen_action, 0, next_state, gamma, game, standard_reward)
                    #Here we can treat this single experience as a mini-batch of size 1 and perform gradient descent as usual.
                    #In our gradient descent step, initial_state takes the place of our 'input' and the target_values form our 'label'.
                    mb_means, mb_variances = waiting_player.mini_batch_gradient_descent([(wp_initial_state, target_values)], eta, lmbda, 1) 

            #After each turn we swap the active player with the waiting player.
            (current_player, waiting_player) = (waiting_player, current_player) 
            #Saving the most recent player's presented board state and their associated chosen action for use in our next update step.
            wp_initial_state = initial_state
            wp_chosen_action = chosen_action

        if total_games%(max_games*0.1) == 0 or total_games == 1:
            #Here we can choose to evaluate our networks against random opponents at each 10% training interval.
            if total_games != max_games and evaluate == True:
                (p1_wins, p1_losses, p1_draws) = NetworkVsNetwork(1000, p1, "random", False, False, game)
                (p2_losses, p2_wins, p2_draws) = NetworkVsNetwork(1000, "random", p2, False, False, game)
                if save_stats == True:
                    scores_vs_random[i] = (p1_wins-p1_losses, p2_wins-p2_losses)
                    i += 1
                    #Here we also save a replay of a game between the networks at each 5% training interval.
                    game_replay = NetworkVsNetwork(1, p1, p2, False, True, game)
                    vs_games.append(game_replay)
                
            #Then we print out how many games are left at intervals of 5% of the total games to be played
            if total_games != 1: print("Games left: ",total_games)    
        
        #Saving the running scores.
        if save_stats == True: running_scores[max_games-total_games] = (p1_score, p2_score)
        total_games = total_games - 1  #Decrementing total_games after a game has been played to completion

    if save_stats == True: return (vs_games, running_scores, scores_vs_random)
    return 

In [4]:
#Code for selecting a move during the training of a neural network.
#The board state will be fed-forward through the network to produce the networks estimates of each action's action-values.
#A move will be chosen based on these values and on the network's exploration/exploitation strategy.
def select_move(player, state, eta, lmbda, game, standard_reward):
    state = state[:, np.newaxis]
    if player != "random": action_values = player.feedforward(state)   #Here we feed the current board state into our network and at returns the values it estimates each action to have in this state.
    if player == "random": action_values = None   #If we are playing against a random network then we don't need the action values as no update step is performed on our random opponent.
    valid_moves = get_valid_moves(state, game)    #Creates a list of the indices for valid moves to be made, i.e. squares where our board is empty for noughts and crosses or where a column isn't full for connect 4.
    
    #If we entered "random" as our network then our move is always chosen randomly.
    if player == "random":       
        chosen_action = random.choice(valid_moves) 
        
    #For epsilon-greedy exploration we simply explore randomly with probability epsilon each move (here epsilon is denoted by exploration_parameter).
    elif player.exploration_type == "epsilon-greedy":
        if np.random.uniform() < player.exploration_parameter:
            chosen_action = random.choice(valid_moves) 
        else:
            #As mentioned above, we usually "exploit" and choose the action with the highest value in our state.
            chosen_action = np.argmax(action_values) 
            #In the case that the network chooses an invalid move treat this as a loss and punish the network.
            #We then allow the game to continue by forcing a valid move to be chosen instead.
            while chosen_action not in valid_moves:   
                #Here we treat this invalid move as a loss and strongly punish the network with a reward of -10 and a single step of gradient descent.
                target_values = get_target_values(player, state.flatten()[:, np.newaxis], chosen_action, -5*standard_reward, None, 0, game, standard_reward)
                mb_means, mb_variances = player.mini_batch_gradient_descent([(state.flatten()[:, np.newaxis], target_values)], eta, lmbda, 1) 
                #Creating a temporary copy of action values which we will use to ensure that a valid action is chosen next.
                action_values_copy = action_values.copy()   
                #Having punished the network we then ensure that the network picks a valid move so that the game can continue.
                for index in valid_moves:   
                    action_values_copy[index] = action_values_copy[index] + 10   #To ensure that the network now picks a valid move we temporarily increase only the valid move values by 10.
                chosen_action = np.argmax(action_values_copy)
                
    return chosen_action, action_values
    
#Here we determine the vector of target values used in our backpropagation step for reinforcement learning. 
#The action-values for the unchosen action are typically unchanged unless they exceed the expected range (-1, 1).
def get_target_values(network, initial_state, chosen_action, reward, next_state, gamma, game, standard_reward):  
    #The only update we potentially do to the unchosen move values is to set their target value to be in the range (-1, 1) if their current value slightly exceeds this limit (usually they will be within this range and this step won't do anything).
    initial_values = network.feedforward(initial_state) 
    #Ensuring that our target values don't go above 1 or below -1.
    initial_values = np.where(abs(initial_values)>standard_reward, np.sign(initial_values)*standard_reward, initial_values)  
    #If the value of next_state is None then we set the value of the next state to 0 since it's a terminal state.
    if next_state is None: next_state_max_value = 0   
    #Otherwise we calculate the maximum action value attainable from actions in our next state.
    else: 
        next_state_values = network.feedforward(next_state)
        next_state_valid_moves = get_valid_moves(next_state, game)
        valid_move_values = []
        for index in next_state_valid_moves: valid_move_values.append(next_state_values[index])
        next_state_max_value = np.max(valid_move_values)
    target_values = initial_values
    #Here we set the target value of our chosen action using the RHS of the Bellman optimality equation (for action-values).
    target_values[chosen_action] = reward + gamma*next_state_max_value     
    return target_values


In [25]:
#Code that allows two networks to play against each other at either "noughts and crosses" or "connect 4".
#total_games is the number of games to be played and also a random opponent can be specified using the string "random".
#If show_game is set to True then each turn of the game will be printed out:
#Here player 1's moves are represented by +1 on the board and player 2's moves are represented by -1.

def NetworkVsNetwork(total_games, p1, p2, show_game, save_game, game):
    
    #Resetting the scores; these will be kept track of over all the games .
    (p1_score, p2_score) = (0, 0)
    max_games = total_games
    
    #Loop over all games to be played.
    while total_games > 0:
        #Resetting the turn number and board for each new game and setting p1 to always be the starting player.
        board, game_over, current_turn = reset_game(game) 
        (current_player, waiting_player) = (p1, p2)
        if save_game == True and max_games == 1: game_replay = [np.copy(board)]
        #Game loop.
        while game_over == False: #As long as the game hasn't been ended by p2 then p1 chooses their move
            state = np.reshape(board, (np.size(board), 1))
            valid_moves = get_valid_moves(state, game) 
            if current_player != "random":
                #Here we are no longer exploring so we always choose the greedy move.
                initial_values = current_player.feedforward(state)
                chosen_action = np.argmax(initial_values)
                while chosen_action not in valid_moves:
                    for index in valid_moves:
                        initial_values[index] = initial_values[index] + 10
                    chosen_action = np.argmax(initial_values)
            else:
                chosen_action = random.choice(valid_moves)
            #Inputting the network's move to the board (or a random move if using a random opponent).
            board, current_turn, p1_score, p2_score, game_over = input_move(chosen_action, show_game, game, board, current_turn, p1_score, p2_score)
            #Saving a replay of the game if specified.
            if save_game == True and max_games == 1: game_replay.append(np.copy(board))
            #After each turn we swap the active player with the waiting player and continue the game.
            (current_player, waiting_player) = (waiting_player, current_player)      
        total_games = total_games-1
    
    draws = max_games-p1_score-p2_score
    if show_game == True:
        print("")
        print("Final scores (p1, p2): (",p1_score,"-",p2_score,")")
        print("Draws = ",draws)
        print("")
    if save_game == True and max_games == 1: return game_replay
    return (p1_score, p2_score, draws)

In [60]:
#Code that allows a human to play against a neural network at either "noughts and crosses" or "connect 4".
#Here player 1's moves are represented by +1 on the board and player 2's moves are represented by -1.

def HumanVsNetwork(p1, p2, game):
    
    #Resetting the turn number and board for each new game and setting p1 to always be the starting player.
    board, game_over, current_turn = reset_game(game) 
    p1_score, p2_score = 0, 0
    (current_player, waiting_player) = (p1, p2) 
    #Game loop; as long as the game hasn't been ended by the previous player then the new current player chooses their move.
    while game_over == False:
        #Reshaping the board so it's ready to be printed for the player to see.
        if game == "noughts and crosses":
            print_board = np.reshape(board, (3,3))
        if game == "connect 4":
            print_board = np.reshape(board, (6,7))
        valid_moves = get_valid_moves(board, game)
        
        #Human's turn.
        if current_player == "human":
            print("Your turn: \n",print_board)
            if game == "noughts and crosses":
                #Making sure that the chosen move is valid.
                human_move = int(input("Choose a square: "))
                while human_move not in [1,2,3,4,5,6,7,8,9]:
                    human_move = int(input("Invalid move, please choose a number from 1 to 9: "))
                while human_move-1 not in valid_moves:
                    human_move = int(input("Chosen square is full, please choose another square: "))
            if game == "connect 4":
                #Making sure that the chosen move is valid.
                human_move = int(input("Choose a column: "))
                while human_move not in [1,2,3,4,5,6,7]:
                    human_move = int(input("Invalid move, please choose a number from 1 to 7: "))
                while human_move-1 not in valid_moves:
                    human_move = int(input("Chosen column is full, please choose another column: "))
            #Since the actual indices start from 0 we have to remove 1 from the selected moves.
            chosen_move = human_move - 1
            
        #Network's turn.
        else:
            print("Network's turn \n",print_board)
            state = np.reshape(board, (np.size(board), 1))
            move_values = current_player.feedforward(state)
            chosen_move = np.argmax(move_values)
            while chosen_move not in valid_moves:
                for index in valid_moves:
                    move_values[index] = move_values[index] + 10
                chosen_move = np.argmax(move_values)
            if game == "noughts and crosses":
                print("Network chose square ",chosen_move+1)
            if game == "connect 4":
                print("Network chose column ",chosen_move+1)
                
        #Inputting either the human's or the network's move to the board.
        board, current_turn, p1_score, p2_score, game_over = input_move(chosen_move, False, game, board, current_turn, p1_score, p2_score)
        #Here we swap the active player with the waiting player and continue the game.
        (current_player, waiting_player) = (waiting_player, current_player)  

In [None]:
#Function for plotting win rates against a random opponent obtained during training.
def plot_stats(stats):
    #Unpacking the stats tuple.
    (vs_games, running_scores, scores_vs_random) = stats
    total_games = len(running_scores)
    
    #Plotting of the running scores of each network during all training games.
    games_played = np.arange(1,total_games+1)
    #Plotting player 1 in red and player 2 in blue.
    plt.figure(figsize=(8,5))
    plt.plot(games_played, running_scores[:,0], color='red')
    plt.plot(games_played, running_scores[:,1], color='blue')
    #Labelling axes and framing plot window.
    plt.xlim([0, plt.xlim()[1]])
    plt.xticks(np.arange(0,total_games+1, 0.1*total_games))
    plt.ylim([0,plt.ylim()[1]])
    plt.xlabel("Training games played", fontsize=14)
    plt.ylabel("Scores during training",fontsize=14)
    plt.legend(["Player 1 Score", "Player 2 Score"], loc="upper left", fontsize=10)
    plt.show()
    
    #Plotting the winrate of each player against a random opponent at 10% training intervals.
    games_played = np.arange(0,total_games+1, 0.1*total_games)
    bar_width = 0.03*total_games
    bar_offset = 0.015*total_games
    plt.figure(figsize=(8,5))
    plt.bar(games_played-bar_offset, scores_vs_random[:,0]/1000, bar_width, color='red', edgecolor = 'black') #Plotting player 1 in red.
    plt.bar(games_played+bar_offset, scores_vs_random[:,1]/1000, bar_width, color='blue', edgecolor = 'black') #Plotting player 2 in blue.
    plt.legend(["Player 1", "Player 2"], loc="upper left", fontsize=10)
    plt.plot([-2*bar_width, total_games+2*bar_width], [0, 0], 'k', linewidth=0.9)
    #Labelling axes and framing plot window.
    plt.xlim([-2*bar_width, total_games+2*bar_width])
    plt.xticks(games_played)
    plt.ylim([plt.ylim()[0], 1])
    plt.xlabel("Training games played", fontsize = 14)
    plt.ylabel("Average score against random opponent", fontsize = 12)
     