In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.functional as F
import tqdm

In [None]:
def get_other_player(self, player):
    """
    Get the other opponent player name
    :param player: the current player name
    :return: the opponent player name
    """
    return "X" if player.player_name == "O" else "O"
def grid_to_state(self, env,  player):
    """
    Convert the numpy grid to a tensor according to the definition in the handout
    :param env: the current environement of the game
    :param player: our current learner
    """
    return torch.tensor([env.grid==env.player2value(player.player_name), grid==env.player2value(self.get_other_player(player.player_name))]) # Might be broken

In [10]:
class DeepQNetwork(nn.Module):
    """
    Class representing our Neural Network to estimate the Q-values of each play
    It is composed of two hidden layers
    """
    def __init__(self):
        self.super()
        self.hidden1 = nn.Linear(18, 100)
        self.hidden2 = nn.Linear(100, 100)
        self.output = nn.Linear(100, 9)
    def forward(self, x): 
        x = x.view(-1,1)
        x = F.relu(self.hidden1(x))
        x = F.relu(self.hidden2(x))
        return self.output(x)
    

In [None]:
Transition = namedtuple('Transition', 
                        ('state', 'action', 'next_state', 'reward'))
 
class ReplayBuffer(object): 
 
    def __init__(self, capacity): 
        self.memory = deque([],maxlen=capacity) 
 
    def push(self, *args): 
        """Save a transition"""
        self.memory.append(Transition(*args)) 
 
    def sample(self, batch_size):
        """
        Get a random batch from the replay buffer
        :param batch_size: the size of the batch we want to get
        :return: the random batch
        """
        return random.sample(self.memory, batch_size) 
 
    def __len__(self):
        """
        Return the length of the replay memory
        """
        return len(self.memory)

In [None]:
class DeepQPlayer():
    def __init__(self, model): 
        self.model = model
        
    def act(self, epsilon, grid):
        """
        Choose a move to perform for the given grid and the epsilon-greedy policy
        :param epsilon: the epislon value used in the epsilon-greedy policy.
        :param grid: the current state of the game
        """
        if random.random() <= epsilon:
            # Perform a random move
            return torch.tensor(random.randrange(self.N_ACTIONS))
        else:
            # Choose the best action according to the ouptut of the estimation networks.
            state = grid_to_state(grid, self.model)
            return self.model(state).max() # potential problem
    

In [None]:
class DeepQTraining():
    def __init__(self):
        # All constants needed in the optimisation 
        self.DISCOUNT_FACTOR = 0.99
        self.BUFFER_SIZE = 10000
        self.BATCH_SIZE = 64
        self.TARGET_NET_UPDATE_STEP = 500
        self.LEARNING_RATE = 5e-4
        self.NB_GAMES = 20000
        self.MAX_GAME_LENGTH = 9
        self.turns = np.array(['X','O'])
        self.N_ACTIONS = 9
        
        
    def reset_parameters(self):
        """
        Reset all training parameters to default 
        """
        self.buffer = ReplayBuffer(self.BUFFER_SIZE)
        self.model = DeepQNetwork()
        self.agent1 = DeepQPlayer(self.model)
        self.agent2 = DeepQPlayer(self.model)
        self.optim = optim.Adam(self.agent1.parameters(), lr=self.LEARNING_RATE)
        self.criterion = nn.HuberLoss()
        self.env = TictactoeEnv()
    
    
    def model_optimiser(self):
        """
        Optimise the parameter of the model according to the QDN algorithm
        """
        if len(self.buffer) < self.BATCH_SIZE: 
            return 
        
        samples = self.buffer.sample(self.BATCH_SIZE)
        #TODO 
        
        output = self.agent1(samples)
        loss = self.criterion(output, )
        
        
    def simulate_game(self, env, opt_player, learner, epsilon_greedy):
        """
        Simulate a full game with the different given player
        :param env: the current environment of the game
        :param opt_player: the player with the optimal policy
        :param learner: our learner
        :param epsilon_greedy: the value to use for epsilon in the epsilon-greedy policy. 
        """
    
        for m in range(self.MAX_GAME_LENGTH):
            # Iterate for the full game 
            if env.current_player == opt_player.player_name:
                # If it is the turn of the optimal player, simply choose a move. 
                move = opt_player.act(env.grid)
            else:
                # If it is the turn of our learner, choose the best possible action
                move = self.agent1.act(epsilon_greedy, env.grid)
            
            # Save current values to store them in the replay buffer later on
            prev_grid = env.grid.copy()
            round_player = env.current_player
            
            if env.check_valid(move):
                # If the chosen move is valid, perform it and ovserve the reward
                _, end, winner = env.step(move)
                reward = env.reward(round_player)
            else:
                # If the chosen move is not valid, end the game and store a reward of -1
                end = True
                reward = -1
                    
            if learner.player_name == round_player:
                # If it is our learner turn, store the current state, chosen action, reward and next state in the replay buffer
                self.buffer.push(grid_to_state(prev_grid), move, grid_to_state(env.grid.copy()), reward)
                
            if end:
                # End the simualtion of the game if the game is over
                return 
                    
    def train(self, epsilon_greedy, adversary_epsilon):
        # Reset all the parameters for the start of the trianing
        self.reset_parameters()
        
        for e in tqdm(range(NB_GAMES)):
            self.env.reset()
            # set the corret player names
            self.agent1.player_name = turns[(e+1)%2]
            opt_player = OptimalPlayer(adversary_epsilon, player_name=turns[e%2])
            # Simulate a game for the current epoch  
            simulate_game(self.env, opt_player, self.agent1, epsilon_greedy(e))
                                          
            if (e+1)%self.TARGET_NET_UPDATE_STEP == 0:
                # If we reach the update epoch, optimize the model
                model_optimiser()