Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB10

Use reinforcement learning to devise a tic-tac-toe player.

### Deadlines:

* Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus)
* Reviews: [Befana](https://en.wikipedia.org/wiki/Befana)

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [25]:
from random import choice
from itertools import permutations
from more_itertools import unique_everseen
from copy import deepcopy
from numpy import base_repr
from enum import Enum
from tqdm.auto import tqdm

We can model a Tic Tac Toe board as a magic square

| 2 | 9 | 4 |
|---|---|---|
| **7** | **5** | **3** |
| **6** | **1** | **8** |

To win, a player must choose positions whose sum equals 15

In [26]:
MAGIC = [2, 9, 4, 7, 5, 3, 6, 1, 8]

class Status(Enum):
    ONGOING = 0
    X_WINS = 1
    O_WINS = 2
    TIE = 3
    INVALID = 4

class Player(Enum):
    NONE = 0
    X = 1
    O = 2

In [27]:
class TicTacToe:


    def __init__(self, *args) -> None:
        """Initialize a Tic-Tac-Toe board. If initialized using a list of integers, returns a board where the moves in the list are played in order. If initialized using a single integer, returns the board based on its unique representation (see __int__). Otherwise initializes an empty board."""
        if len(args) > 0 and isinstance(args[0], list):
            moves = args[0]
            self.x = {e for i, e in enumerate(moves) if i%2==0}
            self.o = {e for i, e in enumerate(moves) if i%2==1}
            self.available = {e for e in range(1, 10) if e not in moves}
        elif len(args) > 0 and isinstance(args[0], int):
            num = base_repr(args[0], 3).rjust(9, '0')  # ternary representation of the board, with leading zeros to length 9
            self.available = {9 - i for i, e in enumerate(num) if e=='0'}     
            self.x = {9 - i for i, e in enumerate(num) if e=='1'}
            self.o = {9 - i for i, e in enumerate(num) if e=='2'}
        else:
            self.x = set()
            self.o = set()
            self.available = set(range(1, 10))


    def __int__(self) -> int:
        """Represents the state of the board as a unique number"""
        res = ''
        for element in range(9, 0, -1):
            if element in self.x:
                res += '1'
            elif element in self.o:
                res += '2'
            else:
                res += '0'
        return int(res, 3)


    def __str__(self) -> str:
        """Returns a string used to display the board state"""
        res = ""
        for r in range(3):
            for c in range(3):
                i = 3 * r + c
                if MAGIC[i] in self.x:
                    res += "x"
                elif MAGIC[i] in self.o:
                    res += "o"
                else:
                    res += "-"
            res += "\n"
        return res


    def check_status(self):
        """Check if the game is terminated. A game is terminated if either player won, no more moves are available, or the state is invalid."""
        curr_status = Status.ONGOING
        for t in permutations(self.x, 3):
            if sum(t) == 15:
                if curr_status==Status.ONGOING or curr_status==Status.X_WINS:
                    curr_status = Status.X_WINS
                else:
                    curr_status = Status.INVALID
                break
        for t in permutations(self.o, 3):
            if sum(t) == 15:
                if curr_status==Status.ONGOING:
                    curr_status = Status.O_WINS
                else:
                    curr_status = Status.INVALID
                break
        if curr_status == Status.ONGOING and not self.available:
            curr_status = Status.TIE
        return curr_status


    def play(self, pos:int) -> None:
        """Plays a move in the board. pos is the position in the magic square corresponding to the move to be played. Returns True if the move was played, False otherwise."""
        if pos not in self.available:
            return False
        if len(self.available) % 2 == 1:
            self.x.add(pos)
        else:
            self.o.add(pos)
        self.available.remove(pos)
        return True


    def transform(self, sequence, revert=False):
        """Applies a sequence of transformations, defined as dictionaries {from: to} to the board. revert = True applies the inverse of the transformations in reverse order."""
        def apply_transformation(board:TicTacToe, map):
            board.available = {map[element] for element in board.available}
            board.x = {map[element] for element in board.x}
            board.o = {map[element] for element in board.o}
            
        new = deepcopy(self)
        if revert:
            sequence = reversed(sequence)
        for transformation in sequence:
            if revert:
                transformation = dict([(value, key) for key, value in transformation.items()])
            apply_transformation(new, transformation)
        return new


    def canonize(self):
        """Return a canonical state equivalent to the current one and the sequence of transformations used to reach it"""
        rotate = {  # clockwise 90° rotation of the board
            1: 7,
            2: 4,
            3: 1,
            4: 8,
            5: 5,
            6: 2,
            7: 9,
            8: 6,
            9: 3
        }
        flip = {    # vertical flip
            1: 9,
            2: 6,
            3: 3,
            4: 8,
            5: 5,
            6: 2,
            7: 7,
            8: 4,
            9: 1
        }
        equivalent = {self: list()}  # states that are equivalent to the current and the transformations used to reach them
        representations = {int(self)} # set containing the representation of each state
        morphs = [[flip], [rotate], [flip, rotate], [rotate, rotate], [flip, rotate, rotate], [rotate, rotate, rotate], [flip, rotate, rotate, rotate]]
        for morph in morphs:
            result = self.transform(morph)
            if int(result) not in representations:
                representations.add(int(result))
                equivalent[result] = morph
        return min(equivalent.items(), key = lambda e: int(e[0]))


    def valid(self) -> bool:
        """Returns False if the state is invalid, True otherwise. A state is invalid if the difference between the number of x and os is greater than one, or if both players won"""
        return len(self.x) - len(self.o) <= 1 and self.check_status() != Status.INVALID


    def current_player(self) -> Player:
        """Returns the player that needs to make a move"""
        return Player.X if len(self.available) % 2 == 1 else Player.O


In [28]:
def random_agent(board:TicTacToe):
    """Random agent: plays a random move from those available in the board"""
    return choice(list(board.available))

ttt = TicTacToe()
status = Status.ONGOING
while status == Status.ONGOING:
    canon, morph = ttt.canonize()
    canon.play(random_agent(canon))
    ttt = canon.transform(morph, revert=True)
    print(ttt)
    status = ttt.check_status()
print(Status(status).name)

--x
---
---

--x
--o
---

-xx
--o
---

-xx
--o
-o-

xxx
--o
-o-

X_WINS


## Markov Decision Process

In [29]:
class MDP:
    def __init__(self, player = Player.X) -> None:
        self.player = player
        self.states = self.generate_states()
        self.discount = 0.5


    def generate_states(self):
        """Generates all possible states of a Tic-Tac-Toe board where it's the player's turn, applying symmetry and pruning to reduce them."""
        states = set()
#        count = 1
        start = 0 if self.player == Player.X else 1
        for length in range(start, 9, 2):
            perms = list(permutations(range(1, 10), length))
#            count += len(perms)
            for perm in unique_everseen(perms, key = lambda e: int(TicTacToe(list(e)))):  # remove equivalent boards
                ttt, _ = TicTacToe(list(perm)).canonize()
                if ttt.valid():
                    states.add(int(ttt))
#        print("Generated", count, "states,", len(states), "after pruning")
        return states


    def reward(self, state):
        """Calculates the reward for the current state. Returns 0 if the state is non-terminal, 1 if the player wins, -1 if it loses, -0.5 if it's a draw"""
        status = TicTacToe(state).check_status()
        if status == Status.ONGOING:
            return 0
        elif status == Status.TIE:
            return -0.5
        elif (status == Status.X_WINS and self.player == Player.X) or (status == Status.O_WINS and self.player == Player.O):
            return 1
        return -1


    @staticmethod
    def transition_model(state, action):
        """Returns a list of tuples (state, prob) describing the probability of reaching a certain state after applying an action to the state"""
        ttt = TicTacToe(state)
        if ttt.check_status() != Status.ONGOING: # the state is terminal before the player moves
            return [(int(ttt), 0)]
        ttt.play(action)
        if ttt.check_status() != Status.ONGOING: # the state is terminal after the player moves, but before the opponent does
            return [(int(ttt.canonize()[0]), 1)]
        probs = {}
        for move in ttt.available:
            tmp = deepcopy(ttt)
            tmp.play(move)
            tmp, _ = tmp.canonize()
            if tmp.check_status() != Status.INVALID:
                if int(tmp) not in probs.keys():
                    probs[int(tmp)] = 0
                probs[int(tmp)] += 1
        return [(k, v/sum(probs.values())) for k, v in probs.items()]


    def q_value(self, state, action, utilities):
        """Returns an utility value for the given action at the given state"""
        possible_states = MDP.transition_model(state, action)
        value = 0
        for possible_state, probability in possible_states:
            if TicTacToe(possible_state).check_status() != Status.ONGOING:
                value += self.reward(possible_state)
            else:
                value += probability * (self.reward(possible_state) + self.discount * utilities[possible_state])
        return value

In [32]:
def value_iteration(mdp: MDP, eps = 0.0001):
    """Calculates the utility of each state. Returns a dictionary containing the best move for each state (policy)."""
    def bellman_update(state, utilities):
        res = list()
        for action in TicTacToe(state).available:
            res.append((action, mdp.q_value(state, action, utilities)))
        if len(res) == 0:
            return [(None, mdp.reward(state))]
        return max(res, key = lambda e: e[1])
    
    utilities_prime = {state: 0 for state in mdp.states}
    policy = {state: None for state in mdp.states}
    epoch = 0
    while True:
        delta = 0
        epoch += 1
        utilities = utilities_prime.copy()
        for state in tqdm(mdp.states):
            policy[state], utilities_prime[state] = bellman_update(state, utilities)
            delta = max(delta, abs(utilities_prime[state] - utilities[state]))
        print("epoch", epoch, "delta =", delta)
        if delta <= eps * (1 - mdp.discount)/mdp.discount:
            break
    return policy

In [33]:
policy_x = value_iteration(MDP(player=Player.X))

100%|██████████| 426/426 [00:00<00:00, 460.43it/s]


epoch 1 delta = 2


100%|██████████| 426/426 [00:00<00:00, 458.61it/s]


epoch 2 delta = 0.5


100%|██████████| 426/426 [00:00<00:00, 448.19it/s]


epoch 3 delta = 0.25


100%|██████████| 426/426 [00:00<00:00, 447.67it/s]


epoch 4 delta = 0.020182291666666685


100%|██████████| 426/426 [00:00<00:00, 455.23it/s]


epoch 5 delta = 0.00016276041666665741


100%|██████████| 426/426 [00:00<00:00, 441.72it/s]

epoch 6 delta = 0





In [34]:
policy_o = value_iteration(MDP(player=Player.O))

100%|██████████| 383/383 [00:00<00:00, 435.39it/s]


epoch 1 delta = 2.0


100%|██████████| 383/383 [00:00<00:00, 445.85it/s]


epoch 2 delta = 0.5


100%|██████████| 383/383 [00:00<00:00, 446.12it/s]


epoch 3 delta = 0.2


100%|██████████| 383/383 [00:00<00:00, 446.91it/s]


epoch 4 delta = 0.01428571428571429


100%|██████████| 383/383 [00:00<00:00, 433.68it/s]

epoch 5 delta = 0





In [35]:
def policy_agent(policy, board:TicTacToe):
    """An agent that plays a move based on a policy"""
    return policy[int(board)]

In [36]:
def evaluate_policy(player, policy):
    wins = 0
    losses = 0
    ties = 0
    for _ in tqdm(range(100_000)):
        ttt = TicTacToe()
        status = Status.ONGOING
        while status == Status.ONGOING:
            if ttt.current_player() == player:
                ttt, morph = ttt.canonize()
                ttt.play(policy_agent(policy, ttt))
                ttt = ttt.transform(morph, revert=True)
            else:
                ttt.play(random_agent(ttt))
            status = ttt.check_status()
        if (status == Status.X_WINS and player == Player.X) or (status == Status.O_WINS and player == player.O):
            wins += 1
        elif (status == Status.O_WINS and player == Player.X) or (status == Status.X_WINS and player == player.O):
            losses += 1
        else:
            ties += 1
    print("Playing as", Player(player).name)
    print("Win rate:", wins/1_000, "%\nLoss rate:", losses/1_000, "%\nTie rate:", ties/1_000,"%")wins = 0


100%|██████████| 100000/100000 [01:16<00:00, 1310.46it/s]

Playing as X
Win rate: 99.477 %
Loss rate: 0.0 %
Tie rate: 0.523 %





In [37]:
print("Value Iteration")
evaluate_policy(Player.X, policy_x)
print()
evaluate_policy(Player.O, policy_o)

100%|██████████| 100000/100000 [01:22<00:00, 1218.78it/s]

Playing as O
Win rate: 91.637 %
Loss rate: 0.0 %
Tie rate: 8.363 %





In [38]:
def policy_iteration(mdp: MDP):
    """Iteratively calculates the policy"""
    def policy_eval(policy, utilities):
        for state in utilities.keys():
            utilities[state] = mdp.q_value(state, policy[state], utilities)
        return utilities
    policy = {state: choice(list(TicTacToe(state).available)) for state in mdp.states}
    utilities = {state: 0 for state in mdp.states}
    epoch = 0
    change = True
    while change:
        epoch += 1
        utilities = policy_eval(policy, utilities)
        change = False
        for state in mdp.states:
            best_action = None
            best_value = None
            for action in TicTacToe(state).available:
                value = mdp.q_value(state, action, utilities)
                if best_value is None or best_value < value:
                    best_value = value
                    best_action = action
            if best_value > mdp.q_value(state, policy[state], utilities):
                policy[state] = best_action
                change = True
    print("Policy found after", epoch, "epochs.")
    return policy


In [39]:
policy_x = policy_iteration(mdp=MDP(player=Player.X))

Policy found after 5 epochs.


In [None]:
policy_o = policy_iteration(mdp=MDP(player=Player.O))

In [None]:
print("Policy Iteration")
evaluate_policy(Player.X, policy_x)
print()
evaluate_policy(Player.O, policy_o)