# Q-Learning

In [None]:
# Package loading
import numpy as np
import random
import matplotlib.pyplot as plt
import time
from tic_env import TictactoeEnv, OptimalPlayer
import seaborn as sn

In [None]:
class QPlayer:
    "Class for a Q-Learning agent player"
    states_value = dict() # q_value table, shared by all class instances 
    def __init__(self, epsilon, alpha = 0.05, gamma = 0.99, player_name = 'O'):
        """
        Initializer.
        INPUTS: 
        - epsilon: exploration level to define the epsilon-greedy policy; the agent takes the best action with probability 1-epsilon
        - alpha: learning rate
        - gamma: discount factor
        - player_name: player symbol on the board
        """
        self.epsilon = epsilon
        self.alpha = alpha 
        self.gamma = gamma 
        self.player_name = player_name 

    def set_epsilon(self, epsilon):
        """
        Functions that sets the exploration level.
        INPUTS:
        - epsilon: exploration level
        """
        self.epsilon = epsilon
        
    def set_player(self, player_name = 'O', j=-1):
        """
        Function to set player symbol, needed to alternate the starting player.
        INPUTS:
        - player_name: player symbol on the board
        - j : number of the current game
        """
        self.player_name = player_name
        if j != -1:
            self.player_name = 'O' if j % 2 == 0 else 'X'
            
    def empty(self, grid):
        """
        Function to return available actions given a board situation.
        INPUTS: 
        - grid: current board status
        OUTPUTS:
        - avail: array with available actions
        """
        avail = []
        for i in range(9):
            pos = (int(i/3), i % 3)
            if grid[pos] == 0:
                avail.append(i)
        return avail
    
    def get_state_key(self, grid):
        """
        Function transforing the grid configuration into a string, needed for use it as key of a dictionary.
        INPUTS:
        - grid: a board status
        OUTPUTS:
        - key: correspondent string
        """ 
        key = str(grid.reshape(3 * 3))
        return key

    def correct_grid(self, grid):
        """
        Function for correct storage of states: boards with same configurations for the q-player, but with different chesses should count as the same state.
        INPUTS:
        - grid: a board status
        OUTPUT:
        - a string associated to the state, with the convention that the q-player positions are always represented by "1"
        """
        if self.player_name == 'X': # we choose to store states with "1." <-> "X" to indicate the q_player
            return self.get_state_key(grid)
        else:
            new_grid = -grid
            return self.get_state_key(new_grid)
    
    def select_optimal_action(self, grid):
        """
        Function selecting the available action with the highest Q-value, given a state.
        INPUTS:
        - grid: a board status
        OUTPUTS:
        - move: action with the highest Q-value
        """
        avail_actions = self.empty(grid) # find all available actions
        key = self.correct_grid(grid) # get the string corresponding to the current state
        if key in self.states_value : 
            # IF THE STATE HAS ALREADY BEEN EXPLORED
            restricted_vector = self.states_value[key][avail_actions] # array with the Q-values of the available actions
            idx = np.argwhere(restricted_vector == np.amax(restricted_vector))[:,0] # position of maximum of restricted_vector
            move = avail_actions[random.choice(idx)] # pick randomply one of the actions with maximum value
        else :
            # IF THE STATE IS EXPLORED FOR THE FIRST TIME
            move = self.select_random_action(grid) # pick randomly the action to do 
        return move
    
    def select_random_action(self,grid):
        """
        Function selecting a random available action, given a state.
        INPUTS:
        - grid: a board status
        OUTPUTS:
        - move: a random available action
        """
        key = self.correct_grid(grid)
        # IF THE STATE IS EXPLORED FOR THE FIRST TIME
        if key not in self.states_value:
                self.states_value[key] = np.zeros([9,1]) # insert the key into the dictionary and a zero-array for the values
        actions = self.empty(grid) # find all available actions
        move = random.choice(actions) # pick randomly the action to do
        return move
    
    def act(self,grid):
        """
        Function selecting an available action given a state following the epsilon-greedy policy.
        INPUTS:
        - grid: a board status
        OUTPUTS:
        - move: an available action
        """
        b = np.random.binomial(1,1-self.epsilon) # generation of a bernoulli random variable
        # choose the action with the highest Q-value with probability 1-epsilon
        if b == 1 : 
            return self.select_optimal_action(grid)
            
        else : 
            return self.select_random_action(grid)    
 

In [None]:
class QLearning:
    "Class for learning from experts"
    def __init__(self, epsilon_q, epsilon_opp, alpha = 0.05, gamma = 0.99):
        """
        Initializer
        INPUTS:
        - epsilon1: exploration level of q_player
        - epsilon2: exploration level of opponent
        - alpha: learning rate
        - gamma: discount factor
        """
        self.q_player = QPlayer(epsilon_q, alpha, gamma) # Q-player
        self.opponent = OptimalPlayer(epsilon_opp) # Opponent
        self.env = TictactoeEnv() # Tic-Tac-Toe environment
        self.reward_vec = [] # vector to store the rewards during several games

        # thresholds for computing training time
        self.M_rand_threshold = 0.856 * 0.8
        self.M_opt_threshold = 0

    def game_train(self, eps_q_player, eps_opponent = 0.5):
        """
        Function to play a training game.
        INPUTS:
        - eps_q_player: exploration level of q_player
        - eps_opponent: exploration level of opponent (default = 0.5)
        """
        self.env.reset() # reset the board
        
        #setting exploration levels
        self.q_player.epsilon = eps_q_player
        self.opponent.epsilon = eps_opponent
        # vector to store states, initialized with empty board
        states = [] 
        states.append(self.env.grid.copy())
        move_q = -1 # initialization of move of q_player
        
        while not self.env.end: # while the game is not over
            
            if self.env.current_player == self.q_player.player_name :
                # action of the q_player
                move_q = self.q_player.act(states[-1]) 
                self.env.step(move_q)
            else:
                # action of the opponent
                move_opp = self.opponent.act(states[-1])
                self.env.step(move_opp)
            # storage of the grid after the move    
            states.append(self.env.grid.copy())

            # update the q_values before the move of the q_player, after the first move
            if self.env.current_player == self.q_player.player_name and move_q != -1:
                self.update_q(self.q_player.player_name, states[-3], states[-1], move_q)

        # to avoid missing one case of update, when the game ends after the move of the q-player
        if self.env.end and self.env.current_player == self.opponent.player :
            self.update_q(self.q_player.player_name, states[-2], states[-1], move_q)

        # return reward of the q-player
        return self.env.reward(self.q_player.player_name)

    def game_test(self, eps_q_player, eps_opponent = 0):
        """
        Function to play a test game.
        INPUTS:
        - eps_q_player: exploration level of q_player
        - eps_opponent: exploration level of opponent (default = 0.5)
        """
        self.env.reset() # reset the board

        #setting exploration levels
        self.q_player.epsilon = eps_q_player
        self.opponent.epsilon = eps_opponent
        
        while not self.env.end: # while the game is not over
            
            if self.env.current_player == self.q_player.player_name :
                # action of the q_player
                move_q = self.q_player.act(self.env.grid)
                self.env.step(move_q)
            else:
                # action of the opponent
                move_opp = self.opponent.act(self.env.grid)
                self.env.step(move_opp)

        # return reward of the q-player
        return self.env.reward(self.q_player.player_name)
         
        
    def update_q(self, player_name, state_prec, state, move_q):
        """
        Function updating the table of Q-values.
        INPUT:
        - player_name: symbol of the player
        - state_prec: precendent state visited by the agent
        - state: state visited by the agent
        - move_q: action performed by the agent to pass from state_prec to state
        """
        r = self.env.reward(player_name) # get the reward of the q_player 
        # get strings correspondent to states
        key = self.q_player.correct_grid(state)
        key_prec = self.q_player.correct_grid(state_prec)
        # check if the state is already present in states_value
        if key not in self.q_player.states_value:
            self.q_player.states_value[key] = np.zeros([9,1])
        if key_prec not in self.q_player.states_value:
            self.q_player.states_value[key_prec] = np.zeros([9,1])
        # update    
        self.q_player.states_value[key_prec][move_q] += self.q_player.alpha*(r + self.q_player.gamma*max([self.q_player.states_value[key][pos] for pos in self.q_player.empty(state)], default=0)-self.q_player.states_value[key_prec][move_q])  

    def play_n_games(self, n, epsilon, train_freq = 250, eps_opponent_train = 0.5, ntt = 0, get_reward = True):
        """
        Function to play n games and compute average reward during training.
        INPUTS:
        - n: number of games to play
        - epsilon: vector of exploration levels of Q-player
        - train_freq: frequence of computation of the average reward during training
        - eps_opponent_train: epsilon of the opponent player for training (default to 0.5)
        - ntt: total number of training games (default to 0)
        - get_reward: true if we want to store the averaged reward
        """
        # initializing the variable for the cumulated reward
        r = 0
        for i in range(n):
            # since X always plays first, we need to exchange the id corresponding to the player each time
            self.q_player.set_player('O',i)
            self.opponent.set_player('X',i)
            # store reward of the current game
            r += self.game_train(epsilon[i+ntt], eps_opponent = eps_opponent_train)
            # every 250 games, compute and store the averaged reward (if needed)
            if (i+1)%train_freq == 0 and i != 0 and get_reward:
                self.reward_vec.append(r/train_freq)
                r = 0 # clean the variable for the cumulated sum

    def test(self,n_test,epsilon_opp):
        """
        Function to play n_test games.
        INPUTS:
        - n_test: number of test games
        - epsilon: exploration level of the opponent
        OUTPUTS:
        - M/n_test: averaged reward
        """
        M = 0 # variable to store the cumulated reward
        for i in range(n_test):
            # switching the first player at each game
            self.q_player.set_player('O',i)
            self.opponent.set_player('X',i)
            # playing a game
            new_game = self.game_test(0, eps_opponent = epsilon_opp)
            M += new_game
        return M/n_test
                
    def play_train_test(self, epsilon, eps_opponent_train = 0.5, n_train = 250, n_tot_train = 20000, n_test = 500):
        """
        Function to play n_tot_train games and compute average reward every n_train games with n_test test games.
        INPUTS:
        - epsilon: exploration level of Q-player
        - eps_opponent_train: epsilon of the opponent (default to 0.5)
        - n_train: frequence of performing the testing during training
        - n_tot_train: total number of training games
        - n_test: number of games to test with
        OUTPUTS:
        - M_opt_vec: array with averaged rewards on n_test games against the OPTIMAL player, computed each n_train games during training
        - M_rand_vec: array with averaged rewards on n_test games against the OPTIMAL player, computed each n_train games during training
        """
        ntt = 0 # index of training game
        M_opt_vec = []
        M_rand_vec = []
        print_time = True
        if print_time :
            start = time.time()
        while ntt < n_tot_train:
            # play n_train training games
            self.play_n_games(n_train , epsilon, eps_opponent_train = eps_opponent_train, ntt = ntt, get_reward = False)
            # each n_train games, perform the test
            if ntt!=0 and ntt%n_train == 0 :
                M_rand_vec.append(self.test(n_test,1))
                M_opt_vec.append(self.test(n_test,0))
                if print_time and M_rand_vec[-1]>= self.M_rand_threshold and M_opt_vec[-1]>=self.M_opt_threshold :
                    end = time.time()
                    print('Converged in: ', end-start)
                    print_time = False

            # update the effective number of training games
            ntt += n_train
            
        return M_opt_vec, M_rand_vec
                    
            
    def plot_avg_reward(self, n, epsilon, step=250):
        """
        Plotting average reward during training, for a specific value of exploration level.
        INPUTS:
        - n: number of games
        - epsilon: exploration level of the q_player
        - step: frequency of the samples
        """
        xx = np.arange(step,n+1,step)
        yy = self.reward_vec
        fig = plt.figure() # opens a new figure for each epsilon
        plt.plot(xx,yy, label = "epsilon="+str(epsilon))
        plt.legend()
        plt.grid()
        plt.xlabel("Nb of games")
        plt.ylabel("Avg reward")
        fig.savefig("ploteps_"+str(epsilon)+".png")
        
    def plot_N_star_reward(self, N_star, n, rewards, step=250):
        """
        Plotting average reward during training, for different values of N_star.
        INPUTS:
        - N_star: N_star
        - n: number of games
        - rewards: vector of rewards computed each n 
        - step: frequency of the samples
        """
        xx = np.arange(step,n+1,step)
        fig = plt.figure() # one figure for each N*
        for i in range(len(N_star)):
            plt.plot(xx,rewards[i], label = 'N*='+str(N_star[i]), linewidth=1, )
        plt.legend(bbox_to_anchor=(1.04,1), loc="upper left") 
        plt.grid()
        plt.xlabel("Nb of games")
        plt.ylabel("Avg reward")
        fig.savefig("plot_nstar_ql.png")

## Question 1

In [None]:
## Question 1

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2023)
eps_vector = [0.1,0.2,0.3,0.4,0.5] # vector of epsilons to try
rew_epsilon = []

N = 20000
for epsilon in eps_vector:
    game = QLearning(epsilon,0.5)
    game.q_player.states_value = dict() # inizialize Q-value table
    game.play_n_games(N, epsilon*np.ones(N), eps_opponent_train = 0.5)
    game.plot_avg_reward(N,epsilon)
    rew_epsilon.append(game.reward_vec)
    print('epsilon=' + str(epsilon) + ' Mean reward: ' + str(np.mean(game.reward_vec)))

## Question 2

In [None]:
# Question 2

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2023)
e_min = 0.1
e_max = 0.8
rew_N_star = []
N = 20000
N_star_vec = [1, 100, 1000, 10000, 15000, 20000, 30000,40000] # vector of N_star to try
for N_star in N_star_vec :
    epsilon = []
    for i in np.arange(1,N+1):
        epsilon.append(max(e_min,e_max*(1-i/N_star)))
    game = QLearning(0,0.5)
    game.q_player.states_value = dict()
    game.play_n_games(N,epsilon)
    rew_N_star.append(game.reward_vec)
    print('N*='+str(N_star)+ ' Mean reward: ' + str(np.mean(game.reward_vec[40:])))
game.plot_N_star_reward(N_star_vec,N,rew_N_star)
    

## Question 3

In [None]:
## Question 3

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2023)
e_min = 0.1
e_max = 0.8
N = 20000 # number of training games
N_star_vec = [1,10,100, 1000, 10000, 15000, 20000, 30000,40000] # vector of N_star to try
plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()
for N_star in N_star_vec :
    # generating the vector of decreasing explorations
    epsilon = []
    for i in np.arange(1,N+1):
        epsilon.append(max(e_min,e_max*(1-i/N_star)))
    # initializing an instance of the QLearning class with the generated epsilon vector
    game = QLearning(epsilon, 0.5)
    game.q_player.states_value = dict() # cleaning the Q-value table
    M_opt_vec, M_rand_vec = game.play_train_test(epsilon, n_train = 250, n_tot_train = N, n_test = 500)
    # printing M_opt and M_rand means (after half of the games) for the corresponding N*
    print('N_star=' + str(N_star) + ' M_opt mean: ' + str(np.mean(M_opt_vec[40:])))
    print('N_star=' + str(N_star) + ' M_rand mean: ' + str(np.mean(M_rand_vec[40:])))
    # plotting M_opt and M_rand for the corresponding N*
    ax1.plot(np.arange(1,len(M_opt_vec)+1)*250, M_opt_vec, label = 'N*='+str(N_star))
    ax1.set_title('M_opt')
    plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
    ax1.grid()
    ax2.plot(np.arange(1,len(M_rand_vec)+1)*250, M_rand_vec, label = 'N*='+str(N_star))
    plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")
    ax2.grid()
    ax2.set_title('M_rand')

## Question 4

In [None]:
## Question 4

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2022)
epsilon = []
e_min = 0.1
e_max = 0.8
N = 20000
N_star = 100 # choice of the best N_star

plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

# generation of the vector with decreasing exploration levels
for i in np.arange(1,N+1):
    epsilon.append(max(e_min,e_max*(1-i/N_star)))
    
eps_opt = [0, 0.2, 0.4, 0.6, 0.8, 1] # vector of eps_opt to try
for eps in eps_opt:
    game = QLearning(0, eps)
    game.q_player.states_value = dict() # cleaning the Q-value table
    M_opt_vec, M_rand_vec = game.play_train_test(epsilon, eps_opponent_train = eps, n_train = 250, n_tot_train = N, n_test = 500)
    # printing mean of M_opt and max & mean of M_rand for comparison among different eps_opt
    print('epsilon=' + str(eps) + ' M_opt mean: ' + str(np.mean(M_opt_vec[40:])))
    print('epsilon=' + str(eps) + ' M_rand max: ' + str(np.max(M_rand_vec)))
    print('epsilon=' + str(eps) + ' M_rand mean: ' + str(np.mean(M_rand_vec[40:])))
    ax1.plot(np.arange(1,len(M_opt_vec)+1)*250, M_opt_vec, label = 'eps='+str(eps))
    ax2.plot(np.arange(1,len(M_rand_vec)+1)*250, M_rand_vec, label = 'eps='+str(eps))
       
plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
ax1.grid()
ax1.set_title('M_opt')  
plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")
ax2.grid()
ax2.set_title('M_rand') 

In [None]:
class SelfQLearning:
    "Class for self-learning"
    def __init__(self, epsilon_q, epsilon_opp = 0.5, alpha = 0.05, gamma = 0.99):
        """
        Initializer
        INPUTS:
        - epsilon1: exploration level of q_player
        - epsilon2: exploration level of opponent
        - alpha: learning rate
        - gamma: discount factor
        """
        self.q_player1 = QPlayer(epsilon_q, alpha, gamma, player_name = 'O')
        self.q_player2 = QPlayer(epsilon_q, alpha, gamma, player_name = 'X')
        self.opponent = OptimalPlayer(epsilon_opp)
        self.env = TictactoeEnv()
        self.reward_vec = []

        # thresholds for computing training time
        self.M_opt_threshold = -0.044 * 1.2 # this is a negative value, so we cannot multiply by 0.8!
        self.M_rand_threshold = 0.866 * 0.8
        
    def game_train(self, eps_q_player):
        """
        Function to play a training game.
        INPUTS:
        - eps_q_player: exploration level of q_player
        - eps_opponent: exploration level of opponent (default = 0.5)
        """

        self.env.reset() # reset the board
        
        states = [] # to store states during the game
        states.append(self.env.grid.copy())
        
        # setting exploration level
        self.q_player1.epsilon = eps_q_player 
        self.q_player2.epsilon = eps_q_player
        
        move1 = -1 # initializing the moves
        move2 = -1
        
        while not self.env.end : # while the game is not over
            if self.env.current_player == self.q_player1.player_name :   
                # action of q_player 1
                move1 = self.q_player1.act(states[-1])
                self.env.step(move1)
                    
            else : 
                # action of q_player 2
                move2 = self.q_player2.act(states[-1])
                self.env.step(move2)
                
            states.append(self.env.grid.copy()) # storage of the grid after the move        

            # update the q_values before the move of the q_player, after the first move    
            if self.env.current_player == self.q_player1.player_name and move1 != -1: 
                self.update_q(self.q_player1.player_name, states[-3], states[-1], move1)
            if self.env.current_player == self.q_player2.player_name and move2 != -1: 
                self.update_q(self.q_player2.player_name, states[-3], states[-1], move2)

        # to avoid missing one case of update, when the game ends after the move of the q-player        
        if self.env.end and self.env.current_player == self.q_player2.player_name :
            self.update_q(self.q_player1.player_name, states[-2], states[-1], move1)
        if self.env.end and self.env.current_player == self.q_player1.player_name :
            self.update_q(self.q_player2.player_name, states[-2], states[-1], move2)
        
        
    def update_q(self, player_name, state_prec, state, move_q):
        """
        Function updating the table of Q-values
        INPUT:
        - player: id of the player
        - states: array with states visited by the agent
        - actions: array with actions performed by the agent
        """
        r = self.env.reward(player_name) # get the reward of the q_player 
        
        # the Q-values table of the terminal state is initialized with zeros
        if player_name == self.q_player1.player_name:
            key = self.q_player1.correct_grid(state)
            key_prec = self.q_player1.correct_grid(state_prec)
        else:
            key = self.q_player2.correct_grid(state)
            key_prec = self.q_player2.correct_grid(state_prec)

        # check if the state is already present 
        if key not in self.q_player1.states_value:
            self.q_player1.states_value[key] = np.zeros([9,1])
        if key_prec not in self.q_player1.states_value:
            self.q_player1.states_value[key_prec] = np.zeros([9,1])
            
        # update Q(s,a)
        self.q_player1.states_value[key_prec][move_q] += self.q_player1.alpha*(r + self.q_player1.gamma*max([self.q_player1.states_value[key][pos] for pos in self.q_player1.empty(state)], default=0)-self.q_player1.states_value[key_prec][move_q])  
       
    def game_test(self, eps_q_player, eps_opponent = 0):
        """
        Function to play a test game
        INPUTS:
        - eps_q_player: exploration level of q_player
        - eps_opponent: exploration level of opponent (default = 0.5)
        """
        self.env.reset() # reset the board

        #setting exploration levels
        self.q_player1.epsilon = eps_q_player
        self.opponent.epsilon = eps_opponent
        
        while not self.env.end: # while the game is not over
            
            if self.env.current_player == self.q_player1.player_name :
                # action of the q_player
                move_q = self.q_player1.act(self.env.grid)
                self.env.step(move_q)
            else:
                # action of the opponent
                move_opp = self.opponent.act(self.env.grid)
                self.env.step(move_opp)

        # return reward of the q-player
        return self.env.reward(self.q_player1.player_name)
            
    def play_n_games(self, n, epsilon, ntt = 0):
        """
        Function to play n training games.
        INPUTS:
        - n: number of games to play
        - epsilon: vector of exploration levels of Q-player
        - ntt: total number of training games (default to 0)
        """
        for i in range(n):
            self.q_player1.set_player('O',i)
            self.q_player2.set_player('X',i+1)
            self.game_train(epsilon[i+ntt])
                
    def play_train_test(self, epsilon, n_train = 250, n_tot_train = 20000, n_test = 500):
        """
        Function to play n_tot_train games and compute average reward every n_train games with n_test test games.
        INPUTS:
        - epsilon: exploration level of Q-player
        - n_train: frequence of performing the testing during training
        - n_tot_train: total number of training games
        - n_test: number of games to test with
        OUTPUTS:
        - M_opt_vec: array with averaged rewards on n_test games against the OPTIMAL player, computed each n_train games during training
        - M_rand_vec: array with averaged rewards on n_test games against the OPTIMAL player, computed each n_train games during training
        """
        ntt = 0 # index of training game
        M_opt_vec = []
        M_rand_vec = []
        print_time = True
        if print_time :
            start = time.time()
        while ntt < n_tot_train:
            # play n_train training games
            self.play_n_games(n_train, epsilon, ntt)
            # each n_train games, perform the test
            if ntt!=0 and ntt%n_train == 0 :
                M_opt_vec.append(self.test(n_test,0))
                M_rand_vec.append(self.test(n_test,1))
                if print_time and M_rand_vec[-1]>= self.M_rand_threshold and M_opt_vec[-1]>=self.M_opt_threshold :
                    end = time.time()
                    print('Converged in: ', end-start)
                    print_time = False
            # update the effective number of training games
            ntt += n_train
            
        return M_opt_vec, M_rand_vec
    
    def test(self,n_test,epsilon_opp):
        """
        Function to play n_test games.
        INPUTS:
        - n_test: number of test games
        - epsilon_opp: exploration level of the opponent
        OUTPUTS:
        - M/n_test: averaged reward
        """
        M = 0 # variable to store the cumulated reward
        for i in range(n_test):
            # switching the first player at each game
            self.q_player1.set_player('O',i)
            self.opponent.set_player('X',i)
            # playing a game
            new_game = self.game_test(0, eps_opponent = epsilon_opp)
            M += new_game
        return M/n_test
                
        
    def plot_N_star_reward(self,N_star, n, rewards, step = 250):
        """
        Plotting average reward during training, for different values of N_star.
        INPUTS:
        - N_star: N_star
        - n: number of games
        - rewards: vector of rewards computed each n 
        - step: frequency of the samples
        """
        xx = np.arange(step,n+1,step)
        fig = plt.figure() # one figure for each N*
        for i in range(len(N_star)):
            plt.plot(xx,rewards[i], label = 'N*='+str(N_star[i]))
        plt.legend()
        plt.xlabel("Nb of games")
        plt.ylabel("Avg reward")
        plt.grid()
        fig.savefig("plot_nstar_selfql.png")

## Question 7

In [None]:
## Question 7

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2023)
eps_vector = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6] # vector of epsilons to try
rew_epsilon = []


plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

ax1.set_title('M_opt')
ax2.set_title('M_rand')

for epsilon in eps_vector:
    game = SelfQLearning(epsilon)
    game.q_player1.states_value = dict() # inizialize Q-value table
    M_opt_vec, M_rand_vec = game.play_train_test(epsilon*np.ones(20000), n_train = 250, n_tot_train = 20000, n_test = 500)
    ax1.plot(np.arange(1,len(M_opt_vec)+1)*250, M_opt_vec, label = 'eps='+str(epsilon))
    ax2.plot(np.arange(1,len(M_rand_vec)+1)*250, M_rand_vec, label = 'eps='+str(epsilon))

ax1.grid()
ax2.grid()

plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")


## Question 8

In [None]:
#Question 8

# setting seeds for reproducibility
np.random.seed(2022)
random.seed(2023)

plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

e_min = 0.1
e_max = 0.8
N = 20000
#N_star_vec = [1] #(best N_star, needed for the heatmaps of Q9)
N_star_vec = [1, 100, 1000, 10000, 15000, 20000, 30000,40000] # vector of N_star to try

for N_star in N_star_vec :
    # generating the vector of decreasing explorations
    epsilon = []
    for i in np.arange(1,N+1):
        epsilon.append(max(e_min,e_max*(1-i/N_star)))
    # initializing an instance of the QLearning class with the generated epsilon vector
    game = SelfQLearning(epsilon)
    game.q_player1.states_value = dict() # cleaning the Q-value table
    M_opt_vec, M_rand_vec = game.play_train_test(epsilon, n_train = 250, n_tot_train = N, n_test = 500)
    # printing M_rand max for the corresponding N*
    print('N_star=' + str(N_star) + ' M_opt mean: ' + str(np.mean(M_opt_vec[40:])))
    print('N_star=' + str(N_star) + ' M_rand mean: ' + str(np.mean(M_rand_vec[40:])))
    # plotting M_opt and M_rand for the corresponding N*
    ax1.plot(np.arange(1,len(M_opt_vec)+1)*250, M_opt_vec, label = 'N*='+str(N_star))
    ax1.set_title('M_opt')
    plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
    ax2.plot(np.arange(1,len(M_rand_vec)+1)*250, M_rand_vec, label = 'N*='+str(N_star))
    plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")
    ax2.set_title('M_rand')

ax1.grid()
ax2.grid()

plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")

## Question 10

In [None]:
#Question 10
# first state selected
states1 =  '[-1. -1. -0.  1. -0.  1. -1.  1. -1.]'
sn.heatmap(game.q_player1.states_value[states1].reshape(3,3), center = 0, square = True, linewidths=.5)

In [None]:
# second state selected
states2 = '[0. 0. 0. 0. 0. 0. 0. 0. 0.]'
sn.heatmap(game.q_player1.states_value[states2].reshape(3,3), center=0, square = True, linewidths=.5)

In [None]:
# third state selected
states3 = '[-1. -1. -0. -0. -0.  1. -0. -0. -0.]'
sn.heatmap(game.q_player1.states_value[states3].reshape(3,3), center = 0, linewidths=.5, square = True)

# DQN

In [None]:
# Package and environment loading
import random
import numpy as np
import time
import tensorflow as tf
from tic_env import TictactoeEnv, OptimalPlayer
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

In [None]:
def Net(n_hidden = 128):
  """
  Fully Conncected Neural Network with:
  - input of shape (3,3,2)
  - two hidden layers with n_hidden hidden neurons and ReLu as activation function
  - output layer of shape (1,9) and linear activation function
  """
  # Input layer
  inputs = layers.Input(shape=(3,3,2))
  flatten = layers.Flatten()(inputs) # flattening a input of (3,3,2) to (18,1)
  # Hidden layers
  layer1 = layers.Dense(n_hidden,activation="relu")(flatten)
  layer2 = layers.Dense(n_hidden,activation="relu")(layer1)
  # Output layer
  outputs = layers.Dense(9,activation ="linear")(layer2)

  return keras.Model(inputs=inputs,outputs=outputs)

In [None]:
class DQPlayer:
  """
  Class implementing the deep Q-player. 
  ATTRIBUTES: 
  - epsilon: exploration rate
  - player_name: player name (X or O)
  - model: net Net()
  - model_target: target net Net()
  - loss_function: Huber loss
  """
  def __init__(self, epsilon, player_name = 'O'):
    self.epsilon = epsilon #exploration rate
    self.player_name = player_name
    self.model = Net() 
    self.model_target = Net()
    self.loss_function = keras.losses.Huber()

  def set_player(self, player_name = 'O', j=-1):
        """
        Function to set player symbol, needed to alternate the starting player.
        INPUTS:
        - player_name: player symbol on the board
        - j : number of the current game
        """
        self.player_name = player_name
        if j != -1:
            self.player_name = 'O' if j % 2 == 0 else 'X'

  def set_epsilon(self, epsilon):
        """
        Functions that sets the exploration level.
        INPUTS:
        - epsilon: exploration level
        """
        self.epsilon = epsilon
    

In [None]:
class DQLearning:
  """
  Class implementing the Deep Q-learning algorithm (DQN) against the eps-optimal player
  """
  def __init__(self, epsilon, epsilon_opponent = 0.5, alpha = 1e-3, gamma = 0.99, player_name = 'O', N = 20000, n_train = 250, n_test = 500, buffer_size = 10000, batch_size = 64, test = False):
    self.epsilon = epsilon #exploration rate of the q-player 
    self.alpha = alpha  #learning rate
    self.gamma = gamma  #discount factor 
    self.dq_player = DQPlayer(epsilon) #dq-player
    self.opponent = OptimalPlayer(epsilon_opponent) #optimal player 
    self.env = TictactoeEnv()

    
    self.optimizer = keras.optimizers.Adam(learning_rate = self.alpha)
    self.N = N  #total number of training games
    self.n_train = n_train #number of training games between test
    self.n_test = n_test #number of games for test
    self.test = test #bool, true if we want to compute M_rand and M_opt
    self.batch_size = batch_size
    
    self.action_history = [] #vector collecting consecutive actions
    self.state_history = []  #vector collecting consecutive states 
    self.state_next_history = [] #vector collecting consecutive (next) states. Differs from the previous for the first and last elements
    self.rewards_history = [] #vector collecting consecutive history
    self.done_history = [] #vector collecting consecutive values of self.env.end

    self.episode_reward_history = [] #vector collecting dq-player rewards 
    self.loss_history = [] #vector collecting loss values at the end of consecutive games
    self.reward_mean = [] 
    self.loss_mean = []

    self.M_rand = []
    self.M_opt = []

    self.episode_count = 0 # game counter
    self.num_actions = 9 # number of actions available

    self.max_memory_length = buffer_size # buffer size
    self.update_target_network = 500 # how often to update the target network

    # thresholds for training time
    self.M_rand_threshold = 0.929 * 0.8
    self.M_opt_threshold = -0.0141 * 1.2

  def build_tensor(self, grid, player_name):
      """
      Functions that builds the tensor from the current grid.
      INPUTS:
      - grid: current grid
      - player_name: name of the dq-player 
      """
      my_state = grid.copy()
      opp_state = grid.copy()
      if player_name == 'X':
        my_state[my_state ==-1.] = 0.
        opp_state[opp_state==1.] = 0.
        opp_state[opp_state==-1.] = 1.
      if player_name == 'O':
        my_state[my_state==1.] = 0.
        my_state[my_state==-1.] = 1.
        opp_state[opp_state==-1.] = 0.

      my_tstate = tf.convert_to_tensor(my_state)
      opp_tstate = tf.convert_to_tensor(opp_state)

      ret_tensor = tf.stack([my_tstate,opp_tstate], axis = 2)

      ret_tensor = tf.reshape(ret_tensor, shape = [1,3,3,2])

      return ret_tensor

    
  def training(self, eps_opponent = 0.5):
    """
    Functions that runs self.N training games. If self.test = true, it runs self.n_test every self.n_train trainings.
    """
    self.episode_count = 0
    print_time = True
    start = time.time()
    while True:  # Run until solved, i.e. while episode_count < self.N
        self.dq_player.set_epsilon(self.epsilon[self.episode_count])
        self.opponent.epsilon = eps_opponent
        self.episode_count += 1
        state,_,_=self.env.reset()
        done = False
        episode_reward = 0
        #switch of the starting player
        self.dq_player.set_player('O', self.episode_count)
        self.opponent.set_player('X', self.episode_count)

        #first move if the starting player is the optimal player
        if self.env.current_player == self.opponent.player:
          move = self.opponent.act(state)
          state,_,_ = self.env.step(move)

        while not done: # while the game is not over
          state_tensor = self.build_tensor(state,self.dq_player.player_name)

          #action of the dq-player
          if self.dq_player.epsilon > np.random.rand(1)[0]:
                  # Take random action
                  action = np.random.choice(self.num_actions)
          else:
                  # Predict action Q-values
                  action_values = self.dq_player.model(state_tensor, training=False)
                  # Take best action
                  idx = np.argwhere(action_values[0].numpy() == np.amax(action_values[0].numpy()))[:,0]
                  action = random.choice(idx)

          #check if the action is available, otherwise end of the game
          try:
            state_adv,_,_ = self.env.step(int(action))
          except ValueError:
              self.env.end = True
              self.env.winner = self.opponent.player
          if not self.env.end:
            action_adv = self.opponent.act(state_adv)
            state_next,_,_ = self.env.step(action_adv)
          else :
            state_next = state
          
          reward = self.env.reward(self.dq_player.player_name)
          episode_reward += reward
          done = self.env.end

          self.action_history.append(action)
          self.state_history.append(self.build_tensor(state, self.dq_player.player_name))
          self.state_next_history.append(self.build_tensor(state_next, self.dq_player.player_name))
          self.done_history.append(done)
          self.rewards_history.append(reward)

          state = state_next

          #training
          if len(self.action_history) > self.batch_size:

            # Get indices of samples for replay buffers
            indices = np.random.choice(range(len(self.action_history)), size=self.batch_size)
            rewards_sample = self.rewards_history[0]

            # Using list comprehension to sample from replay buffer
            state_sample = tf.squeeze(np.array([self.state_history[i] for i in indices]))
            state_next_sample = tf.squeeze(np.array([self.state_next_history[i] for i in indices]))
            
            if self.batch_size == 1:
              state_sample = tf.expand_dims(state_sample, axis = 0)
              state_next_sample = tf.expand_dims(state_next_sample, axis = 0)
              
            rewards_sample = [self.rewards_history[i] for i in indices]
            action_sample = [self.action_history[i] for i in indices]

            done_sample = tf.convert_to_tensor([float(self.done_history[i]) for i in indices])

            masks = tf.one_hot(action_sample, 9)
            
            with tf.GradientTape() as tape:
                prediction = self.dq_player.model(state_sample)
                prediction = tf.reduce_sum(tf.multiply(prediction, masks), axis=1)
                future_rewards = self.dq_player.model_target(state_next_sample).numpy()
                target = rewards_sample + self.gamma * (1-done_sample) * np.max(future_rewards, axis = 1)


                loss = self.dq_player.loss_function(target, prediction)
                self.loss_history.append(loss.numpy())

                # Backpropagation
                grads = tape.gradient(loss, self.dq_player.model.trainable_variables)
                self.optimizer.apply_gradients(zip(grads, self.dq_player.model.trainable_variables))

          if len(self.rewards_history) > self.max_memory_length:
                  del self.rewards_history[:1]
                  del self.state_history[:1]
                  del self.state_next_history[:1]
                  del self.action_history[:1]
                  del self.done_history[:1]

          if done:  #if the game is ended
            break

        self.episode_reward_history.append(episode_reward)

        #mean rewards and test if self.test == true
        if self.episode_count % self.n_train == 0:
          self.reward_mean.append(np.mean(self.episode_reward_history))
          self.episode_reward_history = []
          if self.test:
            self.test_games(n_test = self.n_test)
            if print_time and self.M_rand[-1] > self.M_rand_threshold and self.M_opt[-1] > self.M_opt_threshold :
              end = time.time()
              print('Converged in ', end-start)
              print_time = False
            #self.opponent.epsilon = 0.5
        
        #decreasing learning rate
        if self.episode_count % 2500 == 0:
          self.alpha = 0.5*self.alpha
          self.optimizer = keras.optimizers.Adam(learning_rate = self.alpha)
                
        if self.episode_count % self.n_train == 0 :
          self.loss_mean.append(np.mean(self.loss_history))
          self.loss_history = []

        if self.episode_count % self.update_target_network == 0:
            # update the the target network with new weights
            self.dq_player.model_target.set_weights(self.dq_player.model.get_weights())
            # Log details
            template = "Update target network at episode {}"
            print(template.format(self.episode_count))

            
        if self.episode_count >= self.N :
          print("Done {} training games".format(self.N))
          return self.M_rand, self.M_opt


  def test_games(self, eps_dq_player = 0., n_test = 500):
      """
      Function to play n test games
      INPUTS:
      - eps_dq_player: exploration level of dq_player
      - n_test: number of test games
      """
      rewards_rand = 0
      rewards_opt = 0
      for i in range(n_test):
        self.dq_player.set_player('O',i)
        self.opponent.set_player('X',i)
        new_game = self.game_test(eps_dq_player, 1.)
        rewards_rand += new_game      
        rewards_opt += self.game_test(eps_dq_player, 0.)
      rewards_rand = rewards_rand/n_test
      rewards_opt = rewards_opt/n_test
      self.M_rand.append(rewards_rand)
      self.M_opt.append(rewards_opt)

  def game_test(self, eps_dq_player = 0., eps_opponent = 0.):
        """
        Function to play a test game
        INPUTS:
        - eps_dq_player: exploration level of q_player
        - eps_opponent: exploration level of opponent
        """
        self.env.reset() # reset the board
        #setting exploration levels
        self.dq_player.epsilon = eps_dq_player
        self.opponent.epsilon = eps_opponent
        while not self.env.end: # while the game is not over
            
          if self.env.current_player == self.dq_player.player_name :
                # action of the q_player
                state_tensor = self.build_tensor(self.env.grid.copy(),self.dq_player.player_name)

                if self.dq_player.epsilon > np.random.rand(1)[0]: action = np.random.choice(self.num_actions)
                else:
                  # Predict action Q-values
                  action_values = self.dq_player.model_target(state_tensor, training=False)
                  # Take best action
                  action = tf.argmax(action_values[0]).numpy()
                try:
                  self.env.step(int(action))
                except ValueError:
                    self.env.end = True
                    self.env.winner = self.opponent.player
          else:
              # action of the opponent
              move_opp = self.opponent.act(self.env.grid)
              self.env.step(move_opp)

        # return reward of the q-player
        return self.env.reward(self.dq_player.player_name)


  def plots(self):
    """
    Function to plots avg loss and reward
    """    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,3))
    xx = np.arange(self.n_train,self.N+1,self.n_train)
    y1 = self.loss_mean
    y2 = self.reward_mean
    plt.figure()
    ax1.plot(xx,y1, label = "epsilon="+str(self.dq_player.epsilon))
    ax1.legend()
    ax1.grid()
    ax1.set_xlabel("Nb of games")
    ax1.set_ylabel("Avg loss")
    ax2.plot(xx,y2, label = "epsilon="+str(self.dq_player.epsilon))
    ax2.legend()
    ax2.grid()
    ax2.set_xlabel("Nb of games")
    ax2.set_ylabel("Avg reward")

## Question 11

In [None]:
## Question 11
random.seed(2022)
np.random.seed(2023)
N = 20000
epsilon = 0.1*np.ones([N,1])
dqn = DQLearning(epsilon = epsilon)
dqn.training()
dqn.plots()

## Question 12

In [None]:
## Question 12
random.seed(2022)
np.random.seed(2023)
batch_size = 1
buffer_size = 1
N = 20000
epsilon = 0.1 * np.ones([N,1])
dqn = DQLearning(epsilon = epsilon, buffer_size = buffer_size, batch_size = batch_size)
dqn.training()
dqn.plots()

## Question 13

In [None]:
## Question 13
random.seed(2022)
np.random.seed(2023)
e_min = 0.1
e_max = 0.8
N = 20000
N_star_vec = [1,10,100, 1000, 10000, 15000, 20000, 30000,40000]

plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

for N_star in N_star_vec:
  epsilon = []
  for i in np.arange(1,N+1):
      epsilon.append(max(e_min,e_max*(1-i/N_star)))

  dqn = DQLearning(epsilon = epsilon, test = True)
  M_rand, M_opt = dqn.training()
  print('N_star =' + str(N_star) + ' M_opt mean ' + str(np.mean(M_opt[40:])))
  print('N_star =' + str(N_star) + ' M_rand mean ' + str(np.mean(M_rand[40:])))
  ax1.plot(np.arange(1,len(M_opt)+1)*250, M_opt, label = 'N*='+str(N_star))
  ax1.set_title('M_opt')
  plt1.legend(bbox_to_anchor=(0.88,0.8), loc="upper left")
  ax1.grid()
  ax2.plot(np.arange(1,len(M_rand)+1)*250, M_rand, label = 'N*='+str(N_star))
  plt2.legend(bbox_to_anchor=(0.9,0.8), loc="upper left")
  ax2.grid()
  ax2.set_title('M_rand')

## Question 14

In [None]:
## Question 14
random.seed(2022)
np.random.seed(2023)
e_min = 0.1
e_max = 0.8
N = 20000
N_star = 10000
plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()
epsilon = []
for i in np.arange(1,N+1):
    epsilon.append(max(e_min,e_max*(1-i/N_star)))

eps_opt = [0, 0.2, 0.4, 0.6, 0.8, 1]
for eps in eps_opt:
    dqn = DQLearning(epsilon_opponent = eps, epsilon = epsilon, test = True)
    M_rand, M_opt = dqn.training(eps_opponent = eps)
    print('epsilon=' + str(eps) + ' M_opt mean ' + str(np.mean(M_opt[40:])))
    print('epsilon=' + str(eps) + ' M_rand max ' + str(np.max(M_rand)))
    print('epsilon=' + str(eps) + ' M_rand mean ' + str(np.mean(M_rand[40:])))
    ax1.plot(np.arange(1,len(M_opt)+1)*250, M_opt, label = 'eps='+str(eps))
    ax2.plot(np.arange(1,len(M_rand)+1)*250, M_rand, label = 'eps='+str(eps))
       
plt1.legend(bbox_to_anchor=(0.88,0.8), loc="upper left")
ax1.grid()
ax1.set_title('M_opt')  
plt2.legend(bbox_to_anchor=(0.9,0.8), loc="upper left")
ax2.grid()
ax2.set_title('M_rand') 

In [None]:
class SelfDQLearning:
  """
  Class implementing the Deep Q-learning algorithm (DQN) for the agent playing against itself
  """
  def __init__(self, epsilon, epsilon_opponent = 0.5, alpha = 1e-3, gamma = 0.99, player_name = 'O', N = 20000, n_train = 250, n_test = 500, buffer_size = 10000, batch_size = 64, test = True):
    self.epsilon = epsilon #exploration rate
    self.alpha = alpha #learning rate
    self.gamma = gamma #discount factor
    self.dq_player1 = DQPlayer(epsilon) #dq player 1
    self.dq_player2 = DQPlayer(epsilon) #dq player 2
    self.opponent = OptimalPlayer(epsilon_opponent) #optimal player, for test
    self.env = TictactoeEnv() 

    self.optimizer = keras.optimizers.Adam(learning_rate = self.alpha)
    self.N = N #total number of training games
    self.n_train = n_train #number of training games between test
    self.n_test = n_test #number of test games
    self.test = test #bool, true if you want to compute M_rand and M_opt
    self.batch_size = batch_size
    
    self.action_history = [] #vector collecting consecutive actions
    self.state_history = []  #vector collecting consecutive states 
    self.state_next_history = [] #vector collecting consecutive (next) states.
    self.rewards_history = [] #vector collecting consecutive history
    self.done_history = [] #vector collecting consecutive values of self.env.end

    self.M_rand = []
    self.M_opt = []

    self.episode_count = 0 # game counter
    self.num_actions = 9 # number of actions available

    self.max_memory_length = buffer_size # buffer size
    self.update_target_network = 500 # how often to update the target network
    self.model = Net()
    self.model_target = Net()
    self.loss_function = keras.losses.Huber() # Huber loss

    self.M_rand_threshold = 0.866 * 0.8
    self.M_opt_threshold = -0.0769 * 1.2

  def build_tensor(self, grid, player_name):
      """
      Functions that builds the tensor from the current grid.
      INPUTS:
      - grid: current grid
      - player_name: name of the dq-player 
      """
      my_state = grid.copy()
      opp_state = grid.copy()
      if player_name == 'X':
        my_state[my_state ==-1.] = 0.
        opp_state[opp_state==1.] = 0.
        opp_state[opp_state==-1.] = 1.
      if player_name == 'O':
        my_state[my_state==1.] = 0.
        my_state[my_state==-1.] = 1.
        opp_state[opp_state==-1.] = 0.

      my_tstate = tf.convert_to_tensor(my_state)
      opp_tstate = tf.convert_to_tensor(opp_state)

      ret_tensor = tf.stack([my_tstate,opp_tstate], axis = 2)

      ret_tensor = tf.reshape(ret_tensor, shape = [1,3,3,2])

      return ret_tensor

    
  def training(self):
    """
    Functions that runs self.N training games. If self.test = true, it runs self.n_test every self.n_train trainings.
    """
    self.episode_count = 0
    print_time = True
    start = time.time()
    while True:  # Run until solved, i.e. while episode_count < N
        self.dq_player1.set_epsilon(self.epsilon[self.episode_count])
        self.dq_player2.set_epsilon(self.epsilon[self.episode_count])
        self.episode_count += 1
        done = False

        #switch of the player names
        self.dq_player1.set_player('O', self.episode_count)
        self.dq_player2.set_player('X', self.episode_count+1)
        
        #first state = empty grid
        state1,_,_=self.env.reset()
        state_next1 = state1 #later re-defined
        current_player = self.env.current_player
        state_tensor = self.build_tensor(state1, self.env.current_player)

        #choice of the first action
        if self.dq_player1.epsilon > np.random.rand(1)[0]:
                # Take random action
                action1 = np.random.choice(self.num_actions)
        else:
                # Predict action Q-values
                action_values = self.model(state_tensor, training=False)
                # Take best action
                idx = np.argwhere(action_values[0].numpy() == np.amax(action_values[0].numpy()))[:,0]
                action1 = random.choice(idx)

        state2,_,_ = self.env.step(int(action1))
        state_tensor = self.build_tensor(state2, self.env.current_player)

        #choice of the second action
        if self.dq_player1.epsilon > np.random.rand(1)[0]:
                # Take random action
                action2 = np.random.choice(self.num_actions)
        else:
                # Predict action Q-values
                action_values = self.model(state_tensor, training=False)
                # Take best action
                idx = np.argwhere(action_values[0].numpy() == np.amax(action_values[0].numpy()))[:,0]
                action2 = random.choice(idx)


        while not done: # while the game is not over

          not_current_player = np.setdiff1d(["X","O"], [current_player])[0] 
          
          #check if action2 is available, otherwise self.env.end = true
          try:
            state_next1,_,_ = self.env.step(int(action2))
          except ValueError:
            self.env.end = True
            self.env.winner = current_player  #set the winner to be the opposite of the current player        

          if self.env.end: 
            self.action_history.append(action2)
            self.state_history.append(self.build_tensor(state2, not_current_player))
            self.state_next_history.append(self.build_tensor(self.env.grid, not_current_player))
            self.done_history.append(self.env.end)
            self.rewards_history.append(self.env.reward(not_current_player))


          done = self.env.end
          reward = self.env.reward(current_player)
          self.action_history.append(action1)
          self.state_history.append(self.build_tensor(state1, current_player))
          self.state_next_history.append(self.build_tensor(state_next1, current_player))
          self.done_history.append(self.env.end)
          self.rewards_history.append(self.env.reward(current_player))

          #updates of the actions and states
          action1 = action2
          state1 = state2 
          state2 = state_next1
          if not done: 
            state_tensor = self.build_tensor(state2, current_player)

          current_player = not_current_player

          #next action  
          if not self.env.end:
            if self.dq_player1.epsilon > np.random.rand(1)[0]:
                    # Take random action
                    action2 = np.random.choice(self.num_actions)
            else:
                    # Predict action Q-values
                    action_values = self.model(state_tensor, training=False)
                    # Take best action
                    idx = np.argwhere(action_values[0].numpy() == np.amax(action_values[0].numpy()))[:,0]
                    action2 = random.choice(idx)

          #training 
          if len(self.action_history) > self.batch_size:

              # Get indices of samples for replay buffers
              indices = np.random.choice(range(len(self.action_history)), size=self.batch_size)
              rewards_sample = self.rewards_history[0]

              # Using list comprehension to sample from replay buffer

              state_sample = tf.squeeze(np.array([self.state_history[i] for i in indices]))
              state_next_sample = tf.squeeze(np.array([self.state_next_history[i] for i in indices]))
              if self.batch_size == 1:
                state_sample = tf.expand_dims(state_sample, axis = 0)
                state_next_sample = tf.expand_dims(state_next_sample, axis = 0)
                
              rewards_sample = [self.rewards_history[i] for i in indices]
              action_sample = [self.action_history[i] for i in indices]

              done_sample = tf.convert_to_tensor([float(self.done_history[i]) for i in indices])

              masks = tf.one_hot(action_sample, 9)
              
              with tf.GradientTape() as tape:
                  prediction = self.model(state_sample)
                  prediction = tf.reduce_sum(tf.multiply(prediction, masks), axis=1)
                  future_rewards = self.model_target(state_next_sample).numpy()
                  target = rewards_sample + self.gamma * (1-done_sample) * np.max(future_rewards, axis = 1)
                  loss = self.loss_function(target, prediction)
                  # Backpropagation
                  grads = tape.gradient(loss, self.model.trainable_variables)
                  self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

          if len(self.rewards_history) > self.max_memory_length:
            del self.rewards_history[:1]
            del self.state_history[:1]
            del self.state_next_history[:1]
            del self.action_history[:1]
            del self.done_history[:1]
          
          if done: 
            break
        #update target model
        if self.episode_count % self.update_target_network == 0:
            # update the the target network with new weights
            self.model_target.set_weights(self.model.get_weights())
            # Log details
            template = "Update target network at episode {}"
            print(template.format(self.episode_count))

        #test if self.test = true
        if self.episode_count % self.n_train == 0 and self.test:
            self.test_games(n_test = self.n_test)
            self.opponent.epsilon = 0.5
            if print_time and self.M_rand[-1] >= self.M_rand_threshold and self.M_opt[-1] >= self.M_opt_threshold :
              end = time.time()
              print('Converged in ', end-start)
              print_time = False
        
        if self.episode_count % 2500 == 0:
          self.alpha = 0.5*self.alpha
          self.optimizer = keras.optimizers.Adam(learning_rate = self.alpha)

        if self.episode_count >= self.N :
          print("Done {} training games".format(self.N))
          return self.M_rand, self.M_opt


  def test_games(self, eps_dq_player = 0., n_test = 500):
      """
      Function to play n test games
      INPUTS:
      - eps_dq_player: exploration level of dq_player
      - n_test: number of test games
      """
      rewards_rand = 0
      rewards_opt = 0
      for i in range(n_test):
        self.dq_player1.set_player('O',i)
        self.opponent.set_player('X',i)
        new_game = self.game_test(eps_dq_player, 1.)
        rewards_rand += new_game      
        rewards_opt += self.game_test(eps_dq_player, 0.)
      rewards_rand = rewards_rand/n_test
      rewards_opt = rewards_opt/n_test
      self.M_rand.append(rewards_rand)
      self.M_opt.append(rewards_opt)

  def game_test(self, eps_dq_player = 0., eps_opponent = 0.):
        """
        Function to play a test game
        INPUTS:
        - eps_q_player: exploration level of q_player
        - eps_opponent: exploration level of opponent
        """
        self.env.reset() # reset the board
        #setting exploration levels
        self.dq_player1.epsilon = eps_dq_player
        self.opponent.epsilon = eps_opponent
        #print('eps opp test', self.opponent.epsilon)
        while not self.env.end: # while the game is not over
            
          if self.env.current_player == self.dq_player1.player_name :
                # action of the q_player
                state_tensor = self.build_tensor(self.env.grid.copy(),self.dq_player1.player_name)

                if self.dq_player1.epsilon > np.random.rand(1)[0]: action = np.random.choice(self.num_actions)
                else:
                  # Predict action Q-values
                  action_values = self.model_target(state_tensor, training=False)
                  # Take best action
                  action = tf.argmax(action_values[0]).numpy()
                try:
                  self.env.step(int(action))
                except ValueError:
                    self.env.end = True
                    self.env.winner = self.opponent.player
          else:
              # action of the opponent
              move_opp = self.opponent.act(self.env.grid)
              self.env.step(move_opp)

        # return reward of the q-player
        return self.env.reward(self.dq_player1.player_name)
        

## Question 16

In [None]:
## Question 16
np.random.seed(2022)
random.seed(2023)
eps_vector = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
rew_epsilon = []


n_train = 250
n_test = 500
N = 20000

plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

ax1.set_title('M_opt')
ax2.set_title('M_rand')

for epsilon in eps_vector:
    sdql = SelfDQLearning(epsilon*np.ones([N,1]), N = N, n_train = n_train,  test = True, n_test = n_test, alpha = 1e-3)
    M_rand_vec, M_opt_vec = sdql.training()
    ax1.plot(np.arange(1,len(M_opt_vec)+1)*250, M_opt_vec, label = 'eps='+str(epsilon))
    ax2.plot(np.arange(1,len(M_rand_vec)+1)*250, M_rand_vec, label = 'eps='+str(epsilon))

ax1.grid()
ax2.grid()

plt1.legend(bbox_to_anchor=(0.94,1), loc="upper left")
plt2.legend(bbox_to_anchor=(0.94,1), loc="upper left")


## Question 17

In [None]:
## Question 17
random.seed(2022)
np.random.seed(2023)
n_train = 250
n_test = 500
e_min = 0.1
e_max = 0.8
N = 20000
N_star_vec = [1,1000, 10000, 20000, 30000,40000]
batch_size = 64
buffer_size = 10000


plt1, ax1 = plt.subplots()
plt2, ax2 = plt.subplots()

for N_star in N_star_vec:
  epsilon = []
  for i in np.arange(1,N+1):
      epsilon.append(max(e_min,e_max*(1-i/N_star)))
  sdqn = SelfDQLearning(epsilon = epsilon, N = N, n_train = n_train, test = True, n_test = n_test, alpha = 1e-3)
  M_rand, M_opt = sdqn.training()
  print('N_star =' + str(N_star) + ' M_opt mean ' + str(np.mean(M_opt[40:])))
  print('N_star =' + str(N_star) + ' M_rand mean ' + str(np.mean(M_rand[40:])))
  ax1.plot(np.arange(1,len(M_opt)+1)*250, M_opt, label = 'N*='+str(N_star))
  ax1.set_title('M_opt')
  plt1.legend(bbox_to_anchor=(0.88,0.8), loc="upper left")
  ax1.grid()
  ax2.plot(np.arange(1,len(M_rand)+1)*250, M_rand, label = 'N*='+str(N_star))
  plt2.legend(bbox_to_anchor=(0.9,0.8), loc="upper left")
  ax2.grid()
  ax2.set_title('M_rand')

## Question 19

In [None]:
## Question 19
import seaborn as sn
# first state
grid1 = np.array([[0.,1.,1.],[0., -1., -1], [0.,1.,-1]])
state1 = sdqn.build_tensor(grid1,'X')
out1 = sdqn.model_target(state1)
sn.heatmap(out1.numpy().reshape(3,3), center = 0, square = True, linewidths=.5)

In [None]:
# second state
grid2 = np.array([[0.,1.,0.],[0., 0, -1], [0.,1.,-1]])
state2 = sdqn.build_tensor(grid2,'X')
out2 = sdqn.model_target(state2)
sn.heatmap(out2.numpy().reshape(3,3), center = 0, square = True, linewidths=.5)

In [None]:
# third state
grid3 = np.array([[1.,0.,0.],[0.,-1,0.],[0.,0.,0.]])
state3 = sdqn.build_tensor(grid3,'X')
out3 = sdqn.model_target(state3)
sn.heatmap(out3.numpy().reshape(3,3), center = 0, square = True, linewidths=.5)