Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB10

Use reinforcement learning to devise a tic-tac-toe player.

### Deadlines:

* Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus)
* Reviews: [Befana](https://en.wikipedia.org/wiki/Befana)

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [8]:
from abc import ABC, abstractmethod
from copy import deepcopy
from tqdm import tqdm
import numpy as np
import random
import pickle
import sys
import io

## Player

In [9]:
class Player(ABC):
    def __init__(self, name: str) -> None:
        '''You can change this for your player if you need to handle state/have memory'''
        self.name = name
        pass

    def __str__(self) -> str:
        return f'{self.name}'

    def is_agent(self) -> bool:
        return isinstance(self, MyAgent)

    @abstractmethod
    def make_move(self, game: 'Game') -> tuple[int, int]:
        '''
        game: the Quixo game. You can use it to override the current game with yours, but everything is evaluated by the main game
        return values: this method shall return a tuple of X,Y positions and a move among TOP, BOTTOM, LEFT and RIGHT
        '''
        pass


## Game

In [10]:
class Game(object):
    def __init__(self) -> None:
        self.winner = None
        self._current_player_idx = 1
        self._board = np.ones((3, 3), dtype=np.uint8) * -1
        self._available_moves_list = [(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (2, 0), (2, 1), (2, 2)]
        self._emojis = ['❌', '⭕️', '⚪️']

    def __str__(self) -> str:
        original_stdout = sys.stdout
        output_buffer = io.StringIO()
        sys.stdout = output_buffer

        for r, row in enumerate(self._board):
            for t, tile in enumerate(row):
                print(self._emojis[tile] , end=' ')
            print()

        sys.stdout = original_stdout
        captured_output = output_buffer.getvalue()
        return captured_output

    def check_winner(self) -> int:
        for x in range(self._board.shape[0]):
            if all(self._board[x, :] == self._board[x, 0]):
                return self._board[x, 0]
        for y in range(self._board.shape[0]):
            if all(self._board[:, y] == self._board[0, y]):
                return self._board[0, y]
        if all([self._board[x, x] for x in range(self._board.shape[0])] == self._board[0, 0]):
            return self._board[0, 0]
        if all([self._board[x, -x] for x in range(self._board.shape[0])] == self._board[-1, -1]):
            return self._board[0, -1]

        # tie
        if len(self._available_moves_list) == 0:
            return 2

        return -1

    def play(self, player1: Player, player2: Player) -> int:
        '''Play the game. Returns the winning player'''
        players = [player1, player2]
        winner = -1
        while winner < 0:
            self._current_player_idx += 1
            self._current_player_idx %= len(players)
            ok = False
            while not ok:
                move = players[self._current_player_idx].make_move(self)
                ok = (self._board[move] == -1)
                if ok:
                    self._board[move] = self._current_player_idx
                elif isinstance(players[self._current_player_idx], HumanPlayer):
                    print("That's an invalid move, please reenter your move:")
            self._available_moves_list.remove(move)
            # print(self)
            winner = self.check_winner()
        
        if winner > 1:
            self.winner = None
        else:
            self.winner = players[winner]
            
        return winner

    def single_move(self, move: tuple[int, int]) -> None:
        self._board[move] = self._current_player_idx

    def get_available_moves(self) -> list[tuple[int, int]]:
        '''return the possible moves in the current position'''
        return self._available_moves_list

    def get_hash(self) -> str:
        '''hashes the state of the board'''
        return str(self._board.reshape(3 * 3))
    

## Players Strategies

In [11]:
class RandomPlayer(Player):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def make_move(self, game: 'Game') -> tuple[int, int]:
        move = (random.randint(0, 2), random.randint(0, 2))
        return move

class MyAgent(Player):
    def __init__(self, name: str, exp_rate=0.3) -> None:
        super().__init__(name)
        self._states = list()
        self._state_value = dict()
        self._lr = 0.2
        self._exp_rate = exp_rate
        self._decay_gamma = 0.9

    def make_move(self, game: Game) -> tuple[int, int]:
        move = self.__choose_action(game)
        return move

    def feed_reward(self, reward: float) -> None:
        for st in reversed(self._states):
            if self._state_value.get(st) is None:
                self._state_value[st] = 0
            self._state_value[st] += self._lr * (self._decay_gamma * reward - self._state_value[st])
            reward = self._state_value[st]

    def __choose_action(self, game: Game) -> tuple[int, int]:
        possible_moves = game.get_available_moves()
        if np.random.uniform(0, 1) <= self._exp_rate:
            # take random action
            idx = np.random.choice(len(possible_moves))
            action = possible_moves[idx]
        else:
            value_max = -999
            for pm in possible_moves:
                next_state = deepcopy(game)
                next_state.single_move(pm)
                next_hash = next_state.get_hash()
                value = 0 if self._state_value.get(next_hash) is None else self._state_value.get(next_hash)
                if value >= value_max:
                    value_max = value
                    action = pm

        next_state = deepcopy(game)
        next_state.single_move(action)
        next_hash = next_state.get_hash()
        self._states.append(next_hash)

        return action

    def reset_states(self) -> None:
        self._states.clear()

    def save_policy(self) -> None:
        fw = open('policy_' + str(self.name), 'wb')
        pickle.dump(self._state_value, fw)
        fw.close()

    def load_policy(self, file) -> None:
        fr = open(file, 'rb')
        self._state_value = pickle.load(fr)
        fr.close()

    def set_exp_rate(self, exp_rate: float=0.3) -> None:
        self._exp_rate = exp_rate

class HumanPlayer(Player):
    def __init__(self, name: str) -> None:
        super().__init__(name)

    def make_move(self, game: Game) -> tuple[int, int]:
        x = int(input("Input the x coordinate (from 0 to 2):"))
        y = int(input("Input the y coordinate (from 0 to 2):"))
        return (x, y)

## Model

In [12]:
class Model(object):
    def __init__(self, player1: int, player2: int, name1: str='player1', name2: str='player2', policy1: str=None, policy2: str=None, testing: bool=False) -> None:
        self._players_map = {'RandomPlayer': RandomPlayer, 'HumanPlayer': HumanPlayer, 'MyAgent': MyAgent}
        self._player1 = self._players_map.get(player1)(name=name1)
        self._player2 = self._players_map.get(player2)(name=name2)
        
        if policy1 is not None:
            if self._player1.is_agent():
                self._player1.load_policy(policy1)
            else:
                print(f"WARNING: policy1 was not loaded to player1, since it's not an instance of MyAgent.")
            
        if policy2 is not None:
            if self._player2.is_agent():
                self._player2.load_policy(policy2)
            else:
                print(f"WARNING: policy2 was not loaded to player2, since it's not an instance of MyAgent.")

    def training(self, rounds=1000) -> None:
        if not (self._player1.is_agent() or self._player2.is_agent()):
            print("ERROR: cannot start training with no agents.")
            return

        if self._player1.name == self._player2.name:
            self._player2.name = "player2"   

        for i in tqdm(range(rounds)):
            game = Game()
            winner = game.play(self._player1, self._player2)

            if winner == 0:
                if self._player1.is_agent():
                    self._player1.feed_reward(1)
                    self._player1.reset_states()
                if self._player2.is_agent():
                    self._player2.feed_reward(0)
                    self._player2.reset_states()
            elif winner == 1:
                if self._player1.is_agent():
                    self._player1.feed_reward(0)
                    self._player1.reset_states()
                if self._player2.is_agent():
                    self._player2.feed_reward(1)
                    self._player2.reset_states()
            elif winner == 2:
                if self._player1.is_agent():
                    self._player1.feed_reward(0.1)
                    self._player1.reset_states()
                if self._player2.is_agent():
                    self._player2.feed_reward(0.5)
                    self._player2.reset_states()
                
        if self._player1.is_agent():
            self._player1.save_policy()
        if self._player2.is_agent():
            self._player2.save_policy()

    def testing(self, rounds=1000) -> None:
        wins = [0, 0]

        if self._player1.is_agent():
            self._player1.set_exp_rate(0)

        if self._player2.is_agent():
            self._player2.set_exp_rate(0)

        for i in tqdm(range(rounds)):
            game = Game()
            winner = game.play(self._player1, self._player2)
            if game.winner is not None:
                wins[0] += (1 - winner)
                wins[1] += winner

        win_rate_p1 = (wins[0]/rounds)*100
        draws = ((rounds - sum(wins))/rounds)*100

        print(f"The results of the match [{type(self._player1)} vs {type(self._player2)}] are shown here:")
        print(f"The win rate for the player1 is {win_rate_p1:.2f}% on a total of {rounds} matches")
        print(f"Thw two players drew {draws:.2f}% of the games")

    def single_match(self) -> None:        
        game = Game()
        game.play(self._player1, self._player2)

        if game.winner is None:
            print("The game ended in a draw")
        else:
            print(f"{game.winner} has won!")

## Training and Testing

In [20]:
# model = Model(player1='MyAgent', player2='MyAgent', policy1='policy_100kgames_p1', policy2='policy_100kgames_p2', name1='200kgames_p1', name2='200kgames_p2')
# model.training(rounds=100000)
model = Model(player1='MyAgent', player2='RandomPlayer', policy1='policy_200kgames_p1')
model.testing(rounds=5000)

100%|██████████| 5000/5000 [00:10<00:00, 497.71it/s]

The results of the match [<class '__main__.MyAgent'> vs <class '__main__.RandomPlayer'>] are shown here:
The win rate for the player1 is 97.26% on a total of 5000 matches
Thw two players drew 2.74% of the games





## Single Match with Human

In [14]:
# model = Model(player1='HumanPlayer', player2='RandomPlayer')
# model.single_match()