Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB10

Use reinforcement learning to devise a tic-tac-toe player.

### Deadlines:

* Submission: [Dies Natalis Solis Invicti](https://en.wikipedia.org/wiki/Sol_Invictus)
* Reviews: [Befana](https://en.wikipedia.org/wiki/Befana)

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [1]:
from itertools import combinations
from collections import namedtuple, defaultdict
from random import choice
from copy import deepcopy
import time
from collections import Counter
from typing import Callable, List, Set
from tqdm.auto import tqdm
import numpy as np
import math

In [2]:
State = namedtuple('State', ['x', 'o'])

In [3]:
MAGIC = [2, 7, 6, 9, 5, 1, 4, 3, 8]

In [4]:
def print_board(pos):
    """Nicely prints the board"""
    for r in range(3):
        for c in range(3):
            i = r * 3 + c
            if MAGIC[i] in pos.x:
                print('X', end='')
            elif MAGIC[i] in pos.o:
                print('O', end='')
            else:
                print('.', end='')
        print()
    print()

In [5]:
def win(elements):
    """Checks is elements is winning"""
    return any(sum(c) == 15 for c in combinations(elements, 3))

def state_value(pos: State):
    """Evaluate state: +1 first player wins"""
    if win(pos.x):
        return 1
    elif win(pos.o):
        return -1
    else:
        return 0

In [6]:
def random_game():
    trajectory = list()
    state = State(set(), set())
    available = set(range(1, 9+1))
    
    while available:
        x = choice(list(available))
        state.x.add(x)
        trajectory.append(deepcopy(state))
        available.remove(x)
        if win(state.x) or not available:
            break

        o = choice(list(available))
        state.o.add(o)
        trajectory.append(deepcopy(state))
        available.remove(o)
        if win(state.o):
            break
    return trajectory

In [9]:
value_dictionary = defaultdict(float)
hit_state = defaultdict(int)
epsilon = 0.001

for steps in tqdm(range(500_000)):
    trajectory = random_game()
    final_reward = state_value(trajectory[-1])
    for state in trajectory:
        hashable_state = (frozenset(state.x), frozenset(state.o))
        hit_state[hashable_state] += 1
        value_dictionary[hashable_state] = value_dictionary[
            hashable_state
        ] + epsilon * (final_reward - value_dictionary[hashable_state])

  0%|          | 0/500000 [00:00<?, ?it/s]

In [217]:
sorted(value_dictionary.items(), key=lambda e: e[1], reverse=True)[0:10]

[((frozenset({4, 5, 6}), frozenset({1, 2})), 0.9999999999999445),
 ((frozenset({5, 6}), frozenset({1, 2})), 0.9930526291561056),
 ((frozenset({5, 6}), frozenset({1})), 0.9872972345061332),
 ((frozenset({1, 6, 7, 8, 9}), frozenset({2, 3, 4, 5})), 0.9223293518739043),
 ((frozenset({1, 4, 5, 6, 7}), frozenset({2, 3, 8, 9})), 0.9216268080073997),
 ((frozenset({1, 3, 5, 6, 9}), frozenset({2, 4, 7, 8})), 0.9195613374019573),
 ((frozenset({1, 2, 3, 5, 9}), frozenset({4, 6, 7, 8})), 0.9186711800754285),
 ((frozenset({4, 5, 6, 8, 9}), frozenset({1, 2, 3, 7})), 0.9185897698452737),
 ((frozenset({1, 2, 3, 4, 9}), frozenset({5, 6, 7, 8})), 0.9180176118921533),
 ((frozenset({2, 4, 5, 7, 8}), frozenset({1, 3, 6, 9})), 0.9179355474395928)]

## Methods for evaluating

Before I start initialize refinforment learning policy for playing the game, I define a metric that measures the wining rate while playing with a random player. The method considers both possibilities of starting the game (whether which player plays first). Furthermore, I try to create a class for selecting the next action among all available choices.

In [7]:
class RandomPolicy:
    def __init__(self):
        self.typ = 'ranodm'
        
    def get(self, state: State, available: Set[int]) -> int:
        x = choice(list(available))
        return x

In [8]:
class Evaluation:
    def __init__(self, x_policy, o_policy):
        self.x_policy = x_policy
        self.o_policy = o_policy
        self._empty()
        
    def _empty(self):
        self.trajectory = list()
        self.state = State(set(), set())
        self.available = set(range(1, 9+1))
        
    def _get_stats(self, reward_list):
        return Counter(reward_list)
        
        
    def play(self, switch):
        while self.available:
            if not switch:
                x = self.x_policy.get(self.state, self.available)
            else:
                x = self.o_policy.get(self.state, self.available)
                
            self.state.x.add(x)
            self.trajectory.append(deepcopy(self.state))
            self.available.remove(x)
            if win(self.state.x) or not self.available:
                break
            
            if not switch:
                o = self.o_policy.get(self.state, self.available)
            else:
                o = self.x_policy.get(self.state, self.available)
                
            self.state.o.add(o)
            self.trajectory.append(deepcopy(self.state))
            self.available.remove(o)
            if win(self.state.o):
                break
            
        traj = deepcopy(self.trajectory)
        self._empty()
        return traj
    
    
    def evaluate(self, n_games):
        rewards = []  
        for _ in tqdm(range(n_games)):
            tr = self.play(switch=False)
            reward = state_value(tr[-1])
            rewards.append(reward)
        stats = self._get_stats(rewards)

        
        print(f'total number of plays: {n_games}\n')
        print(f'winning accuracy of {self.x_policy.typ}: {stats[1]/n_games}\n')
        print(40*'_')
    
        
        
        

In [168]:
x_policy = RandomPolicy()
o_policy = RandomPolicy()

eval = Evaluation(x_policy, o_policy)
eval.evaluate(1000)

total number of plays: 1000

winning accuracy of ranodm: 0.589

________________________________________


## Random Policy Conclusion
In random policy we get na average of 50 percent wins which we expected to get. Now we try to optimize the next action selection by value iteration. In the labratory of the course, we played the game for many times to obtain "value_dictionary". Now we try to find optimum policy $\pi^*$ that selects best action based on best values


Value iteration is an algorithm used in reinforcement learning to find the optimal value function and policy for a Markov decision process (MDP). The algorithm iteratively updates the value function for each state until convergence.

### Update Equation
$V_{k+1}(s) = \max_a \left( R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V_k(s') \right)$


### Optimal Policy
$ \pi^*(s) = \arg\max_a \left( R(s, a) + \gamma \sum_{s'} P(s' \mid s, a) V^*(s') \right) $


Value iteration converges to the optimal values and policy, making it a key algorithm in solving MDPs.


In [10]:
class OptimumPolicy:
    def __init__(self,):
        self.vd = defaultdict(float)
        self.typ = 'optimum'
    
    def set_vd(self, value_dictionary):
        self.vd = value_dictionary
        
    def get_vd(self):
        return self.vd
        
    def get(self, state: State, available: Set[int]) -> int:
        temp_val = {}
        for mov in available:
            new_state = deepcopy(state)
            new_state.x.add(mov)
            temp_val[mov] = self.vd[(frozenset(new_state.x), frozenset(new_state.o))]
        
        max_value = max(temp_val, key=temp_val.get)
        return max_value
    
    def iteration(self, n_steps):
        for steps in tqdm(range(n_steps)):
            trajectory = self._play()
            final_reward = state_value(trajectory[-1])
            for state in trajectory:
                hashable_state = (frozenset(state.x), frozenset(state.o))
                self.vd[hashable_state] = self.vd[
                    hashable_state
                ] + epsilon * (final_reward - self.vd[hashable_state])

                

    def _play(self):
        trajectory = list()
        state = State(set(), set())
        available = set(range(1, 9+1))

        while available:
            x = self.get(state, available)
            state.x.add(x)
            trajectory.append(deepcopy(state))
            available.remove(x)
            if win(state.x) or not available:
                break

            o = self.get(state, available)
            state.o.add(o)
            trajectory.append(deepcopy(state))
            available.remove(o)
            if win(state.o):
                break
        return trajectory

## Testing optimum policy
Now we try to evaluate the $\pi^*$ policy against random policy. We can also use value iteration to update the value dictionary to have better winning accuracy. All the rules implemented based on the assumption that our agent plays first. 

In [179]:
x_policy = OptimumPolicy()
x_policy.set_vd(value_dictionary)
o_policy = RandomPolicy()


eval = Evaluation(x_policy, o_policy)
eval.evaluate(10_000)

  0%|          | 0/10000 [00:00<?, ?it/s]

total number of plays: 10000

winning accuracy of optimum: 0.9893

________________________________________


In [180]:
x_policy.iteration(50_000)

  0%|          | 0/500000 [00:00<?, ?it/s]

In [182]:
eval = Evaluation(x_policy, o_policy)
eval.evaluate(10_000)

  0%|          | 0/10000 [00:00<?, ?it/s]

total number of plays: 10000

winning accuracy of optimum: 0.9904

________________________________________


In [184]:
vd = x_policy.get_vd()

## Q-Learning
We can see now after 500_000 iteration, we got better optimum policy. Now we try to implement QLerarning technique for further investigation. First let's discuss about Q-values. 

In reinforcement learning, the Q-value $Q(s, a)$ represents the expected cumulative reward when an agent takes action $a$ in state $s$. It is defined by the Bellman equation:

$Q(s, a) = R(s, a) + \gamma \max_{a'} Q(s', a')$

Here, $R(s, a)$ is the immediate reward, $s$ is the next state, \(\gamma\) is the discount factor, and $\max_{a'} Q(s', a')$ is the maximum Q-value in the next state. Q-values are crucial in algorithms like Q-learning for optimizing an agent's policy. Based on the "values_dictionary" which we obtained during previous iteration, we consider immediate values of each $R(s,a)$ based on new learning of the values wrt the action made in each state.

In [57]:
def new_random_game():
    trajectory = list()
    state = State(set(), set())
    available = set(range(1, 9+1))
    
    while available:
        x = choice(list(available))
        state.x.add(x)
        trajectory.append((deepcopy(state), x))
        available.remove(x)
        if win(state.x) or not available:
            break

        o = choice(list(available))
        state.o.add(o)
        trajectory.append((deepcopy(state), x))
        available.remove(o)
        if win(state.o):
            break
    return trajectory

In [59]:
q_table = defaultdict(float)
hit_state = defaultdict(int)
epsilon = 0.001

for steps in tqdm(range(1_000_000)):
    trajectory = new_random_game()
    final_reward = state_value(trajectory[-1][0])
    for state, action in trajectory:
        hashable_state = (frozenset(state.x), frozenset(state.o), action)
        hit_state[hashable_state] += 1
        q_table[hashable_state] = q_table[
            hashable_state
        ] + epsilon * (final_reward - q_table[hashable_state])

  0%|          | 0/1000000 [00:00<?, ?it/s]

In [88]:
sorted(q_table.items(), key=lambda e: e[1], reverse=True)[0:10]

[((frozenset({1, 2, 4, 5, 8}), frozenset({3, 6, 7, 9}), 2),
  0.8262043479500509),
 ((frozenset({1, 2, 3, 5, 9}), frozenset({4, 6, 7, 8}), 9),
  0.8117218817177256),
 ((frozenset({3, 5, 6, 7, 9}), frozenset({1, 2, 4, 8}), 5),
  0.8113447598927512),
 ((frozenset({1, 4, 5, 6, 7}), frozenset({2, 3, 8, 9}), 5),
  0.8111559158085598),
 ((frozenset({1, 2, 4, 5, 8}), frozenset({3, 6, 7, 9}), 8),
  0.8107776603516027),
 ((frozenset({2, 4, 7, 8, 9}), frozenset({1, 3, 5, 6}), 9), 0.810588248600203),
 ((frozenset({2, 3, 5, 8, 9}), frozenset({1, 4, 6, 7}), 2),
  0.8102088561035539),
 ((frozenset({1, 2, 3, 4, 8}), frozenset({5, 6, 7, 9}), 8),
  0.8083004642615419),
 ((frozenset({2, 6, 7, 8, 9}), frozenset({1, 3, 4, 5}), 2), 0.8079164893237),
 ((frozenset({2, 3, 5, 8, 9}), frozenset({1, 4, 6, 7}), 5),
  0.8073390843668866)]

In [89]:
class QPolicy:
    def __init__(self,):
        self.q_table = defaultdict(float)
        self.typ = 'Q'
    
    def set_q_table(self, q_table):
        self.q_table = q_table
        
    def get_q_table(self):
        return self.q_table
        
    def get(self, state: State, available: Set[int]) -> int:
        temp_val = {}
        cp_state = deepcopy(state)
        for mov in available:
            temp_val[mov] = self.Q(cp_state, mov)
        max_value = max(temp_val, key=temp_val.get)
        return max_value
    
    def iteration(self, n_steps):
        for steps in tqdm(range(n_steps)):
            trajectory = self._play()
            final_reward = state_value(trajectory[-1][0])
            for state, action in trajectory:
                hashable_state = (frozenset(state.x), frozenset(state.o), action)
                self.q_table[hashable_state] = self.q_table[
                    hashable_state
                ] + epsilon * (final_reward - self.q_table[hashable_state])

                

    def _play(self):
        trajectory = list()
        state = State(set(), set())
        available = set(range(1, 9+1))

        while available:
            x = self.get(state, available)
            state.x.add(x)
            trajectory.append((deepcopy(state), x))
            available.remove(x)
            if win(state.x) or not available:
                break

            o = self.get(state, available)
            state.o.add(o)
            trajectory.append((deepcopy(state), x))
            available.remove(o)
            if win(state.o):
                break
        return trajectory
    
    
    def R(self, state: State, action: int) -> float:
        return self.q_table[(frozenset(state.x), frozenset(state.o), action)]

    def Q(self, state: State, action: int, gamma: float = 0.9) -> float:

        s = deepcopy(state)
        s.x.add(action)
        current_r = self.R(s, action)
        taken = s.x.union(s.o)
        available = set(MAGIC) - taken
        temp_val = []
        if available:
            for mov in available:
                next_state = deepcopy(s)
                next_available = deepcopy(available)
                next_state.o.add(mov)
                next_available = next_available - {mov}

                if next_available:
                    for next_mov in next_available:
                        st_cpy = deepcopy(next_state)
                        n_val = self.Q(st_cpy, next_mov)
                        temp_val.append(n_val)


        if len(temp_val)==0:
            return current_r
        else:
            max_value = max(temp_val)
            return current_r + gamma*max_value

In [92]:
o_policy = RandomPolicy()
x_policy = QPolicy()
x_policy.set_q_table(q_table)

eval = Evaluation(x_policy, o_policy)
eval.evaluate(5)

  0%|          | 0/5 [00:00<?, ?it/s]

total number of plays: 5

winning accuracy of Q: 1.0

________________________________________


In [93]:
x_policy = QPolicy()
x_policy.set_q_table(q_table)
x_policy.iteration(5)

  0%|          | 0/5 [00:00<?, ?it/s]

In [102]:
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

def print_board(pos):
    """Nicely prints the board"""
    for r in range(3):
        for c in range(3):
            i = r * 3 + c
            if MAGIC[i] in pos.x:
                if c==2:
                    print(f'|{color.BOLD}X{color.END}|', end='')
                else:
                    print(f'|{color.BOLD}X{color.END}', end='')
                    
            elif MAGIC[i] in pos.o:
                if c==2:
                    print(f'|{color.BOLD}O{color.END}|', end='')
                else:
                    print(f'|{color.BOLD}O{color.END}', end='')
            else:
                if c==2:
                    print(f'|{MAGIC[i]}|', end='')
                else:
                    print(f'|{MAGIC[i]}', end='')
        print()
    print()

In [99]:
def get_input(available):
    x = int(input(f"Enter your move from {available}:  "))
    if x not in available:
        print(f'{x} is not available')
        x = get_input(available)
    return x
        
def experiment(policy):
    
    trajectory = list()
    state = State(set(), set())
    available = set(range(1, 9+1))

    while available:
        print(f'\ncurrent board with available positions in\n')
        print_board(state)
        x = get_input(available)
        state.x.add(x)
        trajectory.append(deepcopy(state))
        available.remove(x)
        if win(state.x):
            print(f"\nCongrats! you win the game.\n")
            print(30*'_')
            break
        if not available:
            print(f"\nit's draw.\n")
            print(30*'_')
            break
            
        o = policy.get(state, available)
        state.o.add(o)
        trajectory.append(deepcopy(state))
        available.remove(o)
        if win(state.o):
            print(f"\nYou messed up unfortunately!\n")
            print(30*'_')
            break
            
        print(30*'_')
    print('\n')
    
    play_again = str(input("Do you want to play again? (y/n) :"))
    if play_again=="y":
        experiment(policy)

In [100]:
x_policy = OptimumPolicy()
x_policy.set_vd(value_dictionary)
experiment(x_policy)


current board with available positions in

|2|7|6|
|9|5|1|
|4|3|8|

Enter your move from {1, 2, 3, 4, 5, 6, 7, 8, 9}:  1
______________________________

current board with available positions in

|[1mO[0m|7|6|
|9|5|[1mX[0m|
|4|3|8|

Enter your move from {3, 4, 5, 6, 7, 8, 9}:  5
______________________________

current board with available positions in

|[1mO[0m|7|6|
|9|[1mX[0m|[1mX[0m|
|4|[1mO[0m|8|

Enter your move from {4, 6, 7, 8, 9}:  9

Congrats! you win the game.

______________________________


Do you want to play again? (y/n) :y

current board with available positions in

|2|7|6|
|9|5|1|
|4|3|8|

Enter your move from {1, 2, 3, 4, 5, 6, 7, 8, 9}:  2
______________________________

current board with available positions in

|[1mX[0m|7|6|
|9|5|[1mO[0m|
|4|3|8|

Enter your move from {3, 4, 5, 6, 7, 8, 9}:  7
______________________________

current board with available positions in

|[1mX[0m|[1mX[0m|6|
|9|5|[1mO[0m|
|4|[1mO[0m|8|

Enter your move from {4, 5

In [101]:
my_policy = QPolicy()
my_policy.set_q_table(q_table)
experiment(my_policy)


current board with available positions in

|2|7|6|
|9|5|1|
|4|3|8|

Enter your move from {1, 2, 3, 4, 5, 6, 7, 8, 9}:  2
______________________________

current board with available positions in

|[1mX[0m|7|6|
|9|5|[1mO[0m|
|4|3|8|

Enter your move from {3, 4, 5, 6, 7, 8, 9}:  7
______________________________

current board with available positions in

|[1mX[0m|[1mX[0m|6|
|9|5|[1mO[0m|
|4|[1mO[0m|8|

Enter your move from {4, 5, 6, 8, 9}:  9
______________________________

current board with available positions in

|[1mX[0m|[1mX[0m|6|
|[1mX[0m|5|[1mO[0m|
|[1mO[0m|[1mO[0m|8|

Enter your move from {5, 6, 8}:  5
______________________________

current board with available positions in

|[1mX[0m|[1mX[0m|[1mO[0m|
|[1mX[0m|[1mX[0m|[1mO[0m|
|[1mO[0m|[1mO[0m|8|

Enter your move from {8}:  8

Congrats! you win the game.

______________________________


Do you want to play again? (y/n) :y

current board with available positions in

|2|7|6|
|9|5|1|
|4|3|8|

## Conclusion
The model works perfect against random selection, however, it lacks reasoning against humans. For improving accuracy we have to optimize the Q-table we designed earlier. To do so, we need high computation time. Another method is to train a neural netwok which can outperform the previous models. 