## Work
This project was designed, programmed and tested by
* Lorenzo Bonannella 
* Giacomo Fantino
* Farisan Fekri
* Giacomo Cauda

In [2]:
import random
from game import Game, Move, Player
from copy import deepcopy
import sys
import numpy as np

class RandomPlayer(Player):
    def __init__(self) -> None:
        super().__init__()

    def make_move(self, game: 'Game') -> tuple[tuple[int, int], Move]:
        #game.print()
        from_pos = (random.randint(0, 4), random.randint(0, 4))
        move = random.choice([Move.TOP, Move.BOTTOM, Move.LEFT, Move.RIGHT])
        return from_pos, move

## MinMax algorithm

We started by using a MinMax strategy.
Since the solution tree is huge we have to set a limit to the depth (cut-off). By doing some test we found that the evaluation of non terminal node could use the difference between zeroes and ones: in this way we are forcing our agent to choose to place more zeroes by picking neutral elements than picking zeroes tiles at the beginning of the games. The idea is that by putting more zeroes the chance of winning later is higher.

In [None]:

class MinMaxPlayer(Player):
    def __init__(self, max_depth=2) -> None:
        super().__init__()
        self.MAXIMUM_DEPTH = max_depth

    def make_move(self, game: 'Game') -> tuple[tuple[int, int], Move]:
        #game.print()
        _, combination = self.minmax(game, True, 0)
        return combination
    
    def minmax(self, game, maximizing_player, depth):
        # The MinMax algorithm function
        # It returns an evalution which a score for a certain player and the best move it can do
        if (game.check_winner() != -1 or depth==self.MAXIMUM_DEPTH):
            if game.check_winner() == 0:
                return 1, None
            elif game.check_winner() == 1:
                return -1, None
            else: # depth==self.MAXIMUM_DEPTH:
                #more zeroes the better since we are player 0
                num_zeros = np.count_nonzero(game._board == 0)
                num_ones = np.count_nonzero(game._board == 1)
                return (num_zeros - num_ones)/100, None
            #we take the solution with the highest number of 0

        depth = depth + 1 #increase depth        
        if maximizing_player:
            max_eval = float('-inf')
            best_move = None
            for possible_move in self.get_possible_moves(game, 0): #first player 
                new_game = self.get_new_board(game, possible_move, 0)
                eval, _ = self.minmax(new_game, False, depth)
                if eval > max_eval:
                    max_eval = eval
                    best_move = possible_move
            return max_eval, best_move
        else:
            min_eval = float('inf')
            best_move = None
            for possible_move in self.get_possible_moves(game, 1):  #second player 
                new_game = self.get_new_board(game, possible_move, 1)
                eval, _ = self.minmax(new_game, True, depth)
                if eval < min_eval:
                    min_eval = eval
                    best_move = possible_move
            return min_eval, best_move
    
    def get_new_board(self, game, possible_move, player):
        #We do a deepcopy of the game object because in MinMax we change the game not only the board attribute
        new_game = deepcopy(game)
        pos, move = possible_move
        res = new_game.move(pos, move, player) #we are trying the move 
        if not res:
            print(f'ERROR: SHOULDNT RECEIVE FALSE on {pos} and {move} player {player}')
            game.print()
            sys.exit(1)
        return new_game

    def acceptable_move(self, pos, mov):
         return not ((pos[1] == 0 and mov == Move.TOP) 
                     or (pos[1] == 4 and mov == Move.BOTTOM) 
                     or (pos[0] == 0 and mov == Move.LEFT)
                     or (pos[0] == 4 and mov == Move.RIGHT))

    def get_possible_moves(self, game, player):
        possible_pos = [(i, j) for i in range(0,5) for j in range(0, 5) 
                            if (i == 0 or i == 4 or j == 0 or j == 4) 
                            and (game._board[j,i] == -1 or game._board[j,i] == player)]
        
        possible_moves_pos = [(pos, mov) for pos in possible_pos for mov in [Move.TOP, Move.BOTTOM, Move.LEFT, Move.RIGHT]]
        #check if position and moves are feasable
        possible_moves_pos = [(pos, mov) for pos, mov in possible_moves_pos if self.acceptable_move(pos, mov)]
        return possible_moves_pos

To decide the limit of the depth we measured the time to evaluate the initial state (being that is the one with higher number of possible moves) for each possible depth. By looking at the results we decided to use depth equal to 2.

In [6]:
import time

initial_game = Game() #the empty board is the one it takes longer time

for depth in range(1, 4):
    player = MinMaxPlayer(max_depth=depth)
    start_time = time.time()

    _ = player.make_move(initial_game)

    end_time = time.time()

    print(f"Execution time for depth {depth}: {end_time - start_time} seconds")


Execution time for depth 1: 0.001154184341430664 seconds
Execution time for depth 2: 0.5223915576934814 seconds
Execution time for depth 3: 18.050224542617798 seconds


This is the function for testing our algorithm against a random player. We can see that our strategy was correct since we got almost 100% of winning rate.

In [None]:
wins = 0
player1 = MinMaxPlayer()
player2 = RandomPlayer()
for i in range(1000):
    print(f'starting game {i}')
    g = Game()
    winner = g.play(player1, player2)
    
    if winner == 0:
        wins += 1

print(f'I won {wins} time out of 1000')

## MinMax algorithm with Alpha-Beta Pruning

Since the depth is very low due to the branching factor of the algorithm we decided to use alpha beta pruning to avoid to compute most of the subtrees.
The strategy is the same as before (picking more zeroes than one) but now the _minmax function includes the alpha beta pruning.
The basic idea is: in the case of the max player we receive from the parent node (min player) in the the current minimum value, in the variable beta. If we find a value which is higher than the minimum we can immediately stop because we won't change the decision of the parent node. The same happens when we are computing the min player.

In [None]:
class MinMaxPlayer(Player):
    def __init__(self, max_depth=4) -> None:
        super().__init__()
        self.MAXIMUM_DEPTH = max_depth

    def make_move(self, game: 'Game') -> tuple[tuple[int, int], Move]:
        #game.print()
        _, combination = self.minmax_alpha_beta(game, 0, -float('inf'), float('inf'), True)
        return combination

    def minmax_alpha_beta(self, game, depth, alpha, beta, maximizing_player):
        # The MinMax algorithm function with Alpha-Beta Pruning
        # alpha = current maximum value that we have found on that sub-branch
        # beta  = current minimum value that we have on that sub-branch
        # Base case: If the game is over or maximum depth is reached
        if (game.check_winner() != -1 or depth==self.MAXIMUM_DEPTH):
            if game.check_winner() == 0:
                return 1, None
            elif game.check_winner() == 1:
                return -1, None
            else: # depth==self.MAXIMUM_DEPTH 
                #more zeroes the better since we are player 0
                num_zeros = np.count_nonzero(game._board == 0)
                num_ones = np.count_nonzero(game._board == 1)
                return (num_zeros - num_ones)/100, None 
            #we take the solution with the highest number of 0
            
        depth = depth + 1 #increase depth 
        
        if maximizing_player:
            max_eval = float('-inf')
            best_move = None
            for possible_move in self.get_possible_moves(game, 0):
                new_game = self.get_new_board(game, possible_move, 0)
                eval, _ = self.minmax_alpha_beta(new_game, depth, alpha, beta, False)
                if eval > max_eval:
                    max_eval = eval
                    best_move = possible_move
                alpha = max(alpha, eval)
                if beta <= alpha:
                    break  # Beta cutoff    
            return max_eval, best_move
        else:
            min_eval = float('inf')
            best_move = None
            for possible_move in self.get_possible_moves(game, 1):  #second player
                new_game = self.get_new_board(game, possible_move, 1)
                eval, _ = self.minmax_alpha_beta(new_game, depth, alpha, beta, True)
                if eval < min_eval:
                    min_eval = eval
                    best_move = possible_move
                beta = min(beta, eval)
                if alpha > beta:
                    break  # Alpha cutoff
            return min_eval, best_move
    
    def get_new_board(self, game, possible_move, player):
        #We do a deepcopy of the game object because in MinMax we change the game not only the board attribute
        new_game = deepcopy(game)
        pos, move = possible_move
        res = new_game.move(pos, move, player) #we are trying the move 
        if not res:
            print(f'ERROR: SHOULDNT RECEIVE FALSE on {pos} and {move} player {player}')
            game.print()
            sys.exit(1)
        return new_game

    def acceptable_move(self, pos, mov):
         return not ((pos[1] == 0 and mov == Move.TOP) 
                     or (pos[1] == 4 and mov == Move.BOTTOM) 
                     or (pos[0] == 0 and mov == Move.LEFT) 
                     or (pos[0] == 4 and mov == Move.RIGHT))

    def get_possible_moves(self, game, player):
        possible_pos = [(i, j) for i in range(0,5) for j in range(0, 5) 
                            if (i == 0 or i == 4 or j == 0 or j == 4) 
                            and (game._board[j,i] == -1 or game._board[j,i] == player)]
        
        possible_moves_pos = [(pos, mov) for pos in possible_pos for mov in [Move.TOP, Move.BOTTOM, Move.LEFT, Move.RIGHT]]
        #check if position and moves are feasable
        possible_moves_pos = [(pos, mov) for pos, mov in possible_moves_pos if self.acceptable_move(pos, mov)]
        return possible_moves_pos

By reusing the same function as before we can see the huge improvement of using alpha beta pruning.

In [13]:
import time

initial_game = Game() #the empty board is the one it takes longer time

for depth in range(1, 6):
    player = MinMaxPlayer(max_depth=depth)
    start_time = time.time()

    _ = player.make_move(initial_game)

    end_time = time.time()

    print(f"Execution time for depth {depth}: {end_time - start_time} seconds")


Execution time for depth 1: 0.014258146286010742 seconds
Execution time for depth 2: 0.03733420372009277 seconds
Execution time for depth 3: 0.4546060562133789 seconds
Execution time for depth 4: 1.267000436782837 seconds
Execution time for depth 5: 21.52586340904236 seconds


The results are the same as before but by printing the board after each move we concluded that the chosen moves done by minmax were more rational.

In [None]:
wins = 0
player1 = MinMaxPlayer()
player2 = RandomPlayer()
for i in range(1000):
    print(f'starting game {i}')
    g = Game()
    winner = g.play(player1, player2)
    
    if winner == 0:
        wins += 1

print(f'I won {wins} time out of 1000')

## Reinforcement Learning with Monte Carlo approach

We decided to test a Reinforcement learning with this game: the agent will plays a certain number of games in a training phase to estimate a Q_function. We assume that by playing a lot of games the Q value of a node will converge to the expected reward of that node. Still we had some doubt since the tree is huge, so it may takes a lot of iterations to actually explore it.

The Monte Carlo approach is the building base of this algorithm: we play a set of random games, see the results and use those to update all the nodes in the trajectories. We used a learning rate (the update weight) of 0.01.

In [None]:
from random import choice
from collections import defaultdict

class RL_Player(Player):
    def __init__(self) -> None:
        super().__init__()
        self.Q_func = defaultdict(float)

    '''
    Once we trained the agent, we use this function to play
    '''
    def make_move(self, game: 'Game') -> tuple[tuple[int, int], Move]:
        #game.print()
        moves = self.get_possible_moves(game, 0)
        #we assume we want the maximum reward possible
        reward = -float('inf')
        best_comb = None
        board_tuple = tuple(map(tuple, game._board))
        #let's get the best action for this scenario from the Q-function
        for mov in moves:
            rew = self.Q_func[(board_tuple, mov)]
            if rew > reward:
                reward = rew
                best_comb = mov
        
        from_pos, move = best_comb
        return from_pos, move
    
    def __random_game(self):
        state = Game()
        trajectory = []

        while True:
            #first player is US
            x = choice(self.get_possible_moves(state, 0))
            trajectory.append((deepcopy(state), x)) #current state + chosen action
            pos, move = x
            state.move(pos, move, 0)

            #check_winner != -1 because i can make a bad action
            if state.check_winner() != -1:
                break

            #opponent player (doesn't consider the state)
            y = choice(self.get_possible_moves(state, 1))
            pos, move = y
            state.move(pos, move, 1)

            if state.check_winner() != -1:
                break
            
        return trajectory, state #sequence of move for computing the state value function

    def state_value(self, state):
        return 1 if state.check_winner() == 0 else -1
    
    def train_agent(self, iterations, epsilon = 0.01):
        for _ in range(iterations):
            #print(f"ITERATION {i}")
            trajectory, last_state = self.__random_game()
            final_reward = self.state_value(last_state)
            for (state, action) in trajectory:
                #convert numpy to tuple to make it hashable
                board_tuple = tuple(map(tuple, state._board))
                hash_state = (board_tuple, action)
                difference = (final_reward - self.Q_func[hash_state])
                self.Q_func[hash_state] = self.Q_func[hash_state] + epsilon*difference
    

    def get_possible_moves(self, game, player):
        possible_pos = [(i, j) for i in range(0,5) for j in range(0, 5) 
                            if (i == 0 or i == 4 or j == 0 or j == 4) 
                            and (game._board[j,i] == -1 or game._board[j,i] == player)]
        
        possible_moves_pos = [(pos, mov) for pos in possible_pos for mov in [Move.TOP, Move.BOTTOM, Move.LEFT, Move.RIGHT]]
        #check if position and moves are feasable
        possible_moves_pos = [(pos, mov) for pos, mov in possible_moves_pos if self.acceptable_move(pos, mov)]
        return possible_moves_pos

    def acceptable_move(self, pos, mov):
         return not ((pos[1] == 0 and mov == Move.TOP) 
                     or (pos[1] == 4 and mov == Move.BOTTOM) 
                     or (pos[0] == 0 and mov == Move.LEFT)
                     or (pos[0] == 4 and mov == Move.RIGHT))

    def best_action_states(self, n):
        top = (sorted(self.Q_func.items(), key=lambda e : e[1], reverse = True)[:n])
        for key, value in top:
            print(f'key : {key}')
            print(f'value: {value}')
            print('')

Here the code to train and then test our agent:

In [None]:
player1 = RL_Player()
player2 = RandomPlayer()

training_its = 200_000
player1.train_agent(training_its)

win = 0
for _ in range(1000):
    g = Game()
    winner = g.play(player1, player2)
    if winner == 0:
        win += 1

print(f'We won {win} out of 1000 games')

Our doubts were correct: with 200_000 iterations the process took more than one hour and the results were not interesting. We tried changing the iterations and the learning rate:

<img src="./loss.jpeg" alt="Plot of the loss functions" width=600/>

## Reinforcement Learning with Monte Carlo approach and Parallization

For this last agent we added two strategies to the RL:
* now it's possible to save the Q function and reload it later to be trained. This gave us the possibilites to train our agent for many iterations without our computers crashing.
* Since the computation of a single game is independent from the others we used Thread Parallelization to speed up the execution.

As we can see from the code we used two optimizations: ThreadPool and Future functions.
1) With threadpool we don't create a thread for each function call but only a fixed amount. Each thread will look at the queue and execute the function.
2) With Future we also parallelize the processing of the results: instead of waiting for all the threads to finish each time a thread stop we immediately process the result in a callback-fashion.

In [None]:
import concurrent.futures
import pickle

class RL_Player(Player):
    def __init__(self) -> None:
        super().__init__()
        self.Q_func = defaultdict(float)

    '''
    Once we trained the agent, we use this function to play
    '''
    def make_move(self, game: 'Game') -> tuple[tuple[int, int], Move]:
        #game.print()
        moves = self.get_possible_moves(game, 0)
        #we assume we want the maximum reward possible
        reward = -float('inf')
        best_comb = None
        board_tuple = tuple(map(tuple, game._board))
        #let's get the best action for this scenario from the Q-function
        for mov in moves:
            rew = self.Q_func[(board_tuple, mov)]
            if rew > reward:
                reward = rew
                best_comb = mov
        
        from_pos, move = best_comb
        return from_pos, move
    
    def _random_game(self):
        state = Game()
        trajectory = []

        while True:
            #first player is US
            x = choice(self.get_possible_moves(state, 0))
            trajectory.append((deepcopy(state), x)) #current state + chosen action
            pos, move = x
            state.move(pos, move, 0)

            #check_winner != -1 because i can make a bad action
            if state.check_winner() != -1:
                break

            #opponent player (doesn't consider the state)
            y = choice(self.get_possible_moves(state, 1))
            pos, move = y
            state.move(pos, move, 1)

            if state.check_winner() != -1:
                break
            
        return trajectory, state #sequence of move for computing the state value function

    def state_value(self, state):
        return 1 if state.check_winner() == 0 else -1
    
    def update_Q_func(self, hash_state, final_reward, epsilon):
        difference = (final_reward - self.Q_func[hash_state])
        self.Q_func[hash_state] = self.Q_func[hash_state] + epsilon * difference
    
    def train_agent(self, iterations, epsilon=0.01, save_dictionary=False):
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(self._random_game) for _ in range(iterations)]

            i = 1
            for future in concurrent.futures.as_completed(futures):
                i+=1
                print(f'process {i} finished')

                try:
                    trajectory, last_state = future.result()
                except Exception as e:
                    print(f'Error in process {i}: {e}')
                    continue  # Continue to the next iteration if there was an exception

                final_reward = self.state_value(last_state)

                for (state, action) in trajectory:
                    board_tuple = tuple(map(tuple, state._board))
                    hash_state = (board_tuple, action)
                    self.update_Q_func(hash_state, final_reward, epsilon)
    
        if save_dictionary:          
            # Specify the file path where you want to save the dictionary
            file_path = 'my_dictionary.pickle'

            pickle.dump(self.Q_func, open(file_path, 'wb')) #in case erase the old file
    
    def load_from_dictionary(self, file_path = 'my_dictionary.pickle'):
        # Load the dictionary from the file
        self.Q_func = pickle.load(open(file_path, 'rb'))
    
    def get_possible_moves(self, game, player):
        possible_pos = [(i, j) for i in range(0,5) for j in range(0, 5) 
                            if (i == 0 or i == 4 or j == 0 or j == 4) 
                            and (game._board[j,i] == -1 or game._board[j,i] == player)]
        
        possible_moves_pos = [(pos, mov) for pos in possible_pos for mov in [Move.TOP, Move.BOTTOM, Move.LEFT, Move.RIGHT]]
        #check if position and moves are feasable
        possible_moves_pos = [(pos, mov) for pos, mov in possible_moves_pos if self.acceptable_move(pos, mov)]
        return possible_moves_pos

    def acceptable_move(self, pos, mov):
         return not ((pos[1] == 0 and mov == Move.TOP) 
                     or (pos[1] == 4 and mov == Move.BOTTOM) 
                     or (pos[0] == 0 and mov == Move.LEFT)
                     or (pos[0] == 4 and mov == Move.RIGHT))

    def best_action_states(self, n):
        top = (sorted(self.Q_func.items(), key=lambda e : e[1], reverse = True)[:n])
        for key, value in top:
            print(f'key : {key}')
            print(f'value: {value}')
            print('')

We can start by seeing the improvements in the time:

| Iterations | old RL       | improvements|
|------------|--------------|-------------|
| 50_000     | 3 min 40 sec | -30 sec     |
| 200_000    | 1 hour 5 min | -20 min     |

We can see a correlations between iterations and saved time.

Performances wise we were able to reach more of a milion iterations by retraining the agent more times. Even by exploring a lot (the pickle file became 5GB) still the exploration was not enough and the results were only roughly 33% of winnings. This confirm the problem of exploring the entire quixo state space for the RL agent and confirm the minmax as the best agent.

In [None]:
#training phase
player1 = RL_Player()
player2 = RandomPlayer()

iterations = 200_000
player1.load_from_dictionary()
player1.train_agent(iterations, save_dictionary=True)
#save_dictionary: if it's true it overrides the file with the dictionary with the new one

In [None]:
#test phase
player1 = RL_Player()
player2 = RandomPlayer()

player1.load_from_dictionary()

win = 0
for _ in range(1000):
    g = Game()
    winner = g.play(player1, player2)
    if winner == 0:
        win += 1

print(f'We won {win} out of 1000 games')