In [10]:
import numpy as np
import torch
from collections import defaultdict
from sympy.utilities.iterables import subsets
from sympy.utilities.iterables import multiset_permutations
from scipy.special import comb
import pandas as pd
import gym
device = 'cuda' if torch.cuda.is_available else 'cpu'

# %load_ext line_profiler

In [21]:
def random_policy(board, symbol):
    available_pos = np.array(np.where(game.board == 0)).T
    return tuple(available_pos[np.random.randint(available_pos.shape[0])])

def deterministic_policy(board, symbol):
    
    def check_almost_align(board, s):
        size = board.shape[0]
        for irow in range(size):
            if board[irow, :].sum() == s * (size - 1):
                i_empty = np.where(board[irow, :] == 0)[0].item()
                return (irow, i_empty)
        for icol in range(size):
            if board[:, icol].sum() == s * (size - 1):
                i_empty = np.where(board[:, icol] == 0)[0].item()
                return (i_empty, icol)
        if np.diag(board).sum() == s * (size - 1):
            i_empty = np.where(np.diag(board) == 0)[0].item()
            return (i_empty, i_empty)
        if np.diag(np.rot90(board)).sum() == s * (size - 1):
            for i in range(size):
                if board[i, size - 1 - i] == 0:
                    return (i, size - 1 - i)
        return None
    
    coords = check_almost_align(board, symbol)
    if coords is not None:
        return coords
    coords = check_almost_align(board, -symbol)
    if coords is not None:
        return coords
    
    #size = board.shape[0]
    #if board[size // 2, size // 2] == 0:
    #    return (size // 2, size // 2)
    
    available_pos = np.array(np.where(game.board == 0)).T
    return tuple(available_pos[np.random.randint(available_pos.shape[0])])

In [22]:
def q_learning(game, agent, opponent_policy, alpha, alpha_factor, gamma, epsilon, epsilon_factor, \
               r_win, r_lose, r_even, r_even2, num_episodes):
    
    def algo(q_array, code_state, code_action, code_new_state, code_new_action, reward, alpha, gamma, state=None):
        if code_new_state is None:
            return q_array.loc[code_state, code_action] + alpha * (reward - q_array.loc[code_state, code_action])
        potential_actions = q_array.loc[code_new_state, :]
        free_spots_scores = [potential_actions.loc[a] for a in potential_actions.columns if state[Agent.decode(a)] == 0]
        return q_array.loc[code_state, code_action] + alpha * (reward + gamma * np.max(free_spots_scores) \
                                                               - q_array.loc[code_state, code_action])  
    agent.algo = algo
    for episode_index in range(num_episodes):
        if episode_index % 1000 == 0:
            print('.', end='')
        alpha *= alpha_factor
        epsilon *= epsilon_factor
        state = game.reset()
        if episode_index%2 == 1:
            action = opponent_policy(game.board, game.turn)
            state, _, _, _ = game.step(action)
        while True:
            action = agent.epsilon_greedy_policy(state, epsilon)
            ################### The agent ignores this happens UNLESS it ends the game
            intermediate_state, final_reward, done, _ = game.step(action)
            if done:
                #### give a reward of 0.1 for an even game when the agent plays first
                if final_reward > 0:
                    final_reward = r_win
                else:
                    final_reward = r_even
                agent.update_Qtable(state, action, None, None, final_reward, alpha, gamma)
                break
            intermediate_action = opponent_policy(intermediate_state, game.turn)
            ################### ------------------------------------------------------
            new_state, reward, done, _ = game.step(intermediate_action)
            if done:
                if reward > 0:
                    reward = r_lose
                else:
                    reward = r_even2
                agent.update_Qtable(state, action, new_state, None, reward, alpha, gamma)
                break
            agent.update_Qtable(state, action, new_state, None, reward, alpha, gamma)
            state = new_state

In [49]:
def sarsa(game, agent, opponent_policy, alpha, alpha_factor, gamma, epsilon, epsilon_factor, \
          r_win, r_lose, r_even, r_even2, num_episodes):
    
    def algo(q_array, code_state, code_action, code_new_state, code_new_action, reward, alpha, gamma, state=None):
        if code_new_state is None or code_new_action is None:
            return q_array.loc[code_state, code_action] + alpha * (reward - q_array.loc[code_state, code_action])
        return q_array.loc[code_state, code_action] + alpha * \
                (reward + gamma * q_array.loc[code_new_state, code_new_action] - q_array.loc[code_state, code_action])
    agent.algo = algo
    
    tot_vict = 0
    tot_even = 0
    tot_lose = 0
    
    for episode_index in range(num_episodes):
        if episode_index % 1000 == 0:
            print('.', end='')
        alpha *= alpha_factor
        epsilon *= epsilon_factor
        state = game.reset()
        if episode_index%2 == 1:
            action = opponent_policy(game.board, game.turn)
            state, _, _, _ = game.step(action)
        action = agent.epsilon_greedy_policy(state, epsilon)
        while True:
            ################### The agent ignores this happens UNLESS it ends the game
            intermediate_state, final_reward, done, _ = game.step(action)
            if done:
                #### give a reward of 0.1 for an even game when the agent plays first
                if final_reward > 0:
                    tot_vict += 1
                    final_reward = r_win
                else:
                    tot_even += 1
                    final_reward = r_even
                agent.update_Qtable(state, action, None, None, final_reward, alpha, gamma)
                break
            intermediate_action = opponent_policy(intermediate_state, game.turn)
            ################### ------------------------------------------------------
            new_state, reward, done, _ = game.step(intermediate_action)
            if done:
                if reward > 0:
                    tot_lose += 1
                    reward = r_lose
                else:
                    tot_even += 1
                    reward = r_even2
                agent.update_Qtable(state, action, None, None, reward, alpha, gamma)
                break
            new_action = agent.epsilon_greedy_policy(new_state, epsilon)
            agent.update_Qtable(state, action, new_state, new_action, reward, alpha, gamma)
            state = new_state
            action = new_action
    
    print('tot vict :', tot_vict, ' tot lose :', tot_lose, ' tot even :', tot_even)

In [153]:
class Agent:
    
    def __init__(self, size=3, policy=None):
        self.q_array = pd.DataFrame(columns=[str(i)+str(j) for i in range(size) for j in range(size)])
        self.algo = None
        self.policy = policy
    
    def update_Qtable(self, state, action, new_state, new_action, reward, alpha, gamma):        
        code_state = self.encode_state(state)
        code_action = self.encode_action(action)
        #### create new rows for the new states met
        try:
            self.q_array.loc[code_state, code_action]
        except:
            self.q_array.loc[code_state, code_action] = 0
            self.q_array = self.q_array.fillna(0)
        ####
        if new_state is not None and new_action is not None:
            code_new_state = self.encode_state(new_state)
            code_new_action = self.encode_action(new_action)
            try:
                self.q_array.loc[code_new_state, code_new_action]
            except:
                self.q_array.loc[code_new_state, code_new_action] = 0
                self.q_array = self.q_array.fillna(0)
        else:
            code_new_state = None
            code_new_action = None
            
        new_val = self.algo(self.q_array, code_state, code_action, code_new_state, code_new_action, reward, alpha, gamma, state)
        self.q_array.loc[code_state, code_action] = new_val
        
    def play_vs_opponent(self, state, symbol):
        if self.policy is not None:
            return self.policy(state, symbol)
        state_code = self.encode_state(state)
        try:
            possible_moves = self.q_array.loc[state_code, :]
            best_move_score, best_move_idx = 0, None
            for cod_act in possible_moves.index:
                coords = self.decode_action(cod_act)
                if state[coords] == 0 and possible_moves.loc[cod_act] >= best_move_score:
                    best_move_score = possible_moves.loc[cod_act]
                    best_move_idx = cod_act
            return self.decode_action(best_move_idx)
        except:
            legal_moves = np.argwhere(state == 0)
        if legal_moves.shape[0] <= 2:
            return tuple(legal_moves[np.random.randint(legal_moves.shape[0])])
        min_max_rewards = []
        for action_array in legal_moves:
            intermediate_state = state.copy()
            intermediate_state[tuple(action_array)] = symbol
            legal_moves_opponent = np.argwhere(intermediate_state == 0)
            max_rewards = []
            for opponent_action_array in legal_moves_opponent:
                new_state = intermediate_state.copy()
                new_state[tuple(opponent_action_array)] = -symbol
                try:
                    code_new_state = self.encode_state(new_state)
                    max_reward = np.max(self.q_array.loc[code_state_code, :])
                    max_rewards.append(max_reward)
                except:
                    pass
            if max_rewards:
                min_max_reward = np.min(max_rewards)
                min_max_rewards.append(min_max_reward)
        if min_max_rewards:
            best_legal_move_idx = np.argmax(min_max_rewards)
            return tuple(legal_moves[best_legal_move_idx])
        return tuple(legal_moves[np.random.randint(legal_moves.shape[0])])
        
    def greedy_policy(self, state):
        """
        Return the next action as a tuple
        """
        code_state = self.encode_state(state)
        try:
            self.q_array.loc[code_sym_states]
        except:
            self.q_array.loc[code_sym_states] = 0
            
        actions = self.q_array.loc[code_sym_states]
        legal_action_codes = [act for act in actions.index if state[self.decode_action(act)] == 0]
        legal_actions = actions.loc[legal_action_codes]
        best_legal_actions = legal_actions[legal_actions == legal_actions.max()]
        code_action_taken = np.random.choice(best_legal_actions.index)
        coords_action = self.decode_action(code_action_taken)
        corresponding_state = self.decode_one_state(code_sym_states.split('_')[0])
        
        if np.prod(state == corresponding_state):
            return coords_action
        
        empty_board = state * 0
        empty_board[coords_action] = 1
        
        if np.prod(state == np.rot90(corresponding_state, 1)):
            return tuple(np.argwhere(np.rot90(empty_board) == 1)[0])
        
        if np.prod(state == np.rot90(corresponding_state, 2)):
            return tuple(np.argwhere(np.rot90(empty_board, 2) == 1)[0])
        
        if np.prod(state == np.rot90(corresponding_state, 3)):
            return tuple(np.argwhere(np.rot90(empty_board, 3) == 1)[0])
        
        if np.prod(state == corresponding_state[::-1]):
            return tuple(np.argwhere(empty_board[::-1] == 1)[0])
        
        if np.prod(state == corresponding_state[:, ::-1]):
            return tuple(np.argwhere(empty_board[:, ::-1] == 1)[0])
        
        if np.prod(state == corresponding_state.T):
            return tuple(np.argwhere(empty_board.T == 1)[0])
        
        if np.prod(state == corresponding_state[::-1, ::-1].T):
            return tuple(np.argwhere(empty_board[::-1, ::-1].T == 1)[0])

    def epsilon_greedy_policy(self, state, epsilon):
        """
        Return the next action as a tuple
        """
        if self.q_array.empty or np.random.rand() < epsilon:
            legal_moves = np.argwhere(state == 0)
            size = legal_moves.shape[0]
            random_idx = np.random.randint(size)
            action = tuple(legal_moves[random_idx])
        else:
            action = self.greedy_policy(state)
        return action
    
    def encode_action(self, action):
        if type(action) is str:
            return action
        code_action = ''
        for i in action:
            code_action += str(i)
        return code_action

    def decode_action(self, code_action):
        return tuple([int(i) for i in code_action])

    def encode_one_state(self, state):
        code_state = ''
        for i in state.flatten():
            code_state += str(i) if i != -1 else '2'
        return code_state

    def generate_all_sym_states(self, state):
        sym_states = [self.encode_one_state(state), self.encode_one_state(np.rot90(state, 1)), \
                      self.encode_one_state(np.rot90(state, 2)), self.encode_one_state(np.rot90(state, 3)), \
                      self.encode_one_state(state[::-1]), self.encode_one_state(state[:, ::-1]), \
                      self.encode_one_state(state.T), self.encode_one_state(state[::-1, ::-1].T)]
        sym_states.sort()
        return sym_states

    def encode_state(self, state):
        sym_states = self.generate_all_sym_states(state)
        return sym_states[0]
    
    def decode_one_state(self, code_state):
        flat_list = [0 if elem == '0' else 1 if elem == '1' else -1 for elem in list(code_state)]
        state = np.reshape(flat_list, (self.size, self.size))
        return state

In [51]:
class Game(gym.Env):
    
    def __init__(self, p1, p2, size=3, n_dim=2):
        assert(type(n_dim) is int and n_dim >= 2), "wrong n_dim"
        assert(type(size) is int and size >= 2), "wrong size"
        self.n_dim = n_dim
        self.size = size
        self.p1 = p1
        self.p2 = p2
        self.turn = 1
        self.board = np.zeros([size]*n_dim, dtype=int)
        self.current_score = (0, 0)
        self._max_episode_steps = 1000
        super(Game, self).__init__()
    
    def play_a_game(self):
        self.turn = 1
        self.reset()
        self.render()
        digits = '1234567890'
        while not self.is_done():
            if self.turn == 1:
                player = self.p1
            else:
                player = self.p2
            if isinstance(player, Agent):
                coords = player.play_vs_opponent(self.board, self.turn)
                print('Agent plays :', coords, '\n')
            else:
                coords_str = input('Coordinates of next move : ')
                print()
                coords = []
                for c in coords_str:
                    if c in digits:
                        coords.append(int(c))
                coords = tuple(coords)
                while len(coords) != self.n_dim or max(coords) >= self.size or not self.is_available(coords):
                    coords_str = input('Position not available, please try another one : ')
                    coords = []
                    for c in coords_str:
                        if c in digits:
                            coords.append(int(c))
                    coords = tuple(coords)
            self.step(coords)
            self.render()
        print('Game over. Score :', self.current_score)
        if self.current_score[0] > self.current_score[1]:
            print(self.p1, 'wins !')
        elif self.current_score[1] > self.current_score[0]:
            print(self.p2, 'wins !')
        else:
            print('Even score.')
    
    def is_available(self, position):
        return self.board[position] == 0
    
    def is_done(self):
        if self.n_dim == 2:
            return sum(self.current_score) != 0 or 0 not in self.board
        return 0 not in self.board
    
    def reset(self):
        self.board *= 0
        return self.board.copy()

    def step(self, position):
        self.board[position] = self.turn
        score_p1, score_p2 = self.score()
        score_p1_diff, score_p2_diff =  score_p1 - self.current_score[0], score_p2 - self.current_score[1]
        # update only the score of the player that did the latest move
        if self.turn == 1:
            self.current_score = (score_p1, self.current_score[1])
        else:
            self.current_score = (self.current_score[0], score_p2)
        reward = score_p1_diff if self.turn == 1 else score_p2_diff
        self.turn *= -1
        return self.board, reward, self.is_done(), None             
    
    def render(self):
        visual_board = self.board.copy()
        visual_board = np.where(visual_board == -1, 'O', visual_board)
        visual_board = np.where(visual_board == '1', 'X', visual_board)
        visual_board = np.where(visual_board == '0', '.', visual_board)
        for icol in range(self.size):
            for row in visual_board[icol, :]:
                print(row, end=' ')
            print()
        print()
    
    def score(self):
        score_p1 = 0
        score_p2 = 0
        
        def slice_to_mask(L):
            """
            Enables to use slicing operator like array[x, y, :, z] with choosing the position
            of the symbol ':' (represented with a -1 instead). For example L can be equal to
            [0, 0, -1, 0] if we want to access self.board[0, 0, :, 0]
            """
            mask = np.zeros([self.size] * self.n_dim, dtype=bool)
            dim = L.index(-1)
            for tile in range(self.size):
                L[dim] = tile
                mask[tuple(L)] = True
            return mask
        
        # vertical and horizontal axis
        all_axis = []
        for d in range(self.size ** self.n_dim):
            all_axis.append([(d // self.size**k) % self.size for k in range(self.n_dim)[::-1]])
            # example in 3D case with size 3 :
            # all_axis = [ [i, j, k] for i = 0, 1, 2 for j = 0, 1, 2 for k = 0, 1, 2 ]
        for d in range(self.n_dim):
            d_axis = np.array(all_axis)
            d_axis[:, d] = -1
            d_axis = np.unique(d_axis, axis=0)
            for axis in d_axis:
                space_mask = slice_to_mask(list(axis))
                in_game_axis = self.board[space_mask]
                axis_value = in_game_axis.sum().item()
                if axis_value == self.size:
                    score_p1 += 1
                elif axis_value == -self.size:
                    score_p2 += 1
        
        # diagonal axis
        diag = np.array([range(self.size)]).T
        antidiag = np.array([range(self.size-1, -1, -1)]).T
        poss_diag = np.array([diag, antidiag])
        poss_index = list(range(self.size))
        coords_to_check = set()
        for dof in range(self.n_dim-2, -1, -1):
            dof_fc = self.n_dim - dof
            cpt = 0
            for fc in subsets(poss_diag, dof_fc, repetition=True):
                if cpt == int(dof_fc / 2) + 1:
                    break
                cpt += 1
                frozen_comp = np.array(fc).reshape((dof_fc, self.size)).T
                if dof > 0:
                    for free_comp in subsets(poss_index, dof, repetition=True):
                        free_comp_array = np.repeat(np.array([free_comp]), self.size, axis=0)
                        coords = np.hstack((free_comp_array, frozen_comp))
                        for perm in multiset_permutations(coords.T.tolist()):
                            perm_coords = [list(i) for i in zip(*perm)]
                            perm_coords.sort()
                            coords_to_check.add(tuple(map(tuple, perm_coords)))
                else:
                    coords = frozen_comp
                    for perm in multiset_permutations(coords.T.tolist()):
                        perm_coords = [list(i) for i in zip(*perm)]
                        perm_coords.sort()
                        coords_to_check.add(tuple(map(tuple, perm_coords)))
                        
        for coords in coords_to_check:
            total = 0
            for tile in coords:
                total += self.board[tile]
            if abs(total) == self.size:
                if total > 0:
                    score_p1 += 1
                else:
                    score_p2 += 1
                
        return score_p1, score_p2

In [66]:
agent = Agent(size=3)
game = Game('Vic', agent, n_dim=2, size=3)
sarsa(game, agent, random_policy, alpha=0.5, alpha_factor=1.0, gamma=0.9, epsilon=1.0, epsilon_factor=0.9997, \
      r_win=100, r_lose=0, r_even=1, r_even2=2, num_episodes=10000)

..........tot vict : 2144  tot lose : 2829  tot even : 5027


In [67]:
agent.q_array

Unnamed: 0,00,01,02,10,11,12,20,21,22
000000000,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
000000210,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
100000212,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
100012212,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
110012212,0.0,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...
012221201,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
200010211,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
201012211,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
201212211,0.0,0.0,0.0,100.0,0.0,0.0,0.0,0.0,0.0


In [46]:
agent = Agent(size=3)
game = Game('Vic', agent, n_dim=2, size=3)
sarsa(game, agent, deterministic_policy, alpha=1.0, alpha_factor=0.999, gamma=0.95, epsilon=1.0, epsilon_factor=0.9998, \
      r_win=100, r_lose=0, r_even=1, r_even2=2, num_episodes=30000)

..............................tot vict : 279  tot lose : 14647  tot even : 15074


In [45]:
agent = Agent(size=3)
game = Game('Vic', agent, n_dim=2, size=3)
sarsa(game, agent, deterministic_policy, alpha=1.0, alpha_factor=0.999, gamma=0.95, epsilon=1.0, epsilon_factor=0.9998, \
      r_win=100, r_lose=0, r_even=1, r_even2=2, num_episodes=20000)

....................tot vict : 182  tot lose : 9753  tot even : 10065


In [42]:
# ..........tot vict : 99  tot lose : 4841  tot even : 5060

In [16]:
agent = Agent(size=3)
game = Game('Vic', agent, n_dim=2, size=3)
q_learning(game, agent, random_policy, alpha=1.0, alpha_factor=0.99999, gamma=0.8, epsilon=1.0, epsilon_factor=0.99997, \
           r_win=10, r_lose=-5, r_even=1, r_even2=2, num_episodes=100000)

....................................................................................................

In [65]:
(agent.q_array.sum(axis=1) != 0).sum()

66

In [72]:
agent2 = Agent(size=3, policy=random_policy)
game = Game(agent, agent2, n_dim=2, size=3)
game.play_a_game()

. . . 
. . . 
. . . 

Agent plays : (2, 2) 

. . . 
. . . 
. . X 

Agent plays : (0, 0) 

O . . 
. . . 
. . X 

Agent plays : (2, 1) 

O . . 
. . . 
. X X 

Agent plays : (0, 1) 

O O . 
. . . 
. X X 

Agent plays : (2, 0) 

O O . 
. . . 
X X X 

Game over. Score : (1, 0)
<__main__.Agent object at 0x0000010F194A1850> wins !
