In [22]:
import numpy as np
import torch
from collections import defaultdict
from sympy.utilities.iterables import subsets
from sympy.utilities.iterables import multiset_permutations
import gym
device = 'cuda' if torch.cuda.is_available else 'cpu'

%load_ext line_profiler

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


In [None]:
# To do : Q-table for tic-tac-toe in 2D. In 3D, the state space is above 20 millions.

# Hash the table : encode each 2 conseq mark (0, 1, -1) by a number in 0-9

In [20]:
Q_table = defaultdict(dict)
Q_table[10][3] = 10
Q_table

defaultdict(dict, {10: {3: 10}})

In [21]:
def greedy_policy(state, q_array):

    # TODO...
    action = np.argmax(q_array[state, :])

    return action


def epsilon_greedy_policy(state, q_array, epsilon):

    # TODO...
    action = greedy_policy(state, q_array) if np.random.rand(1)[0] < 1-epsilon else np.random.randint(q_array.shape[1])

    return action

In [None]:
DISPLAY_EVERY_N_EPISODES = 50

def sarsa(environment, alpha=0.1, alpha_factor=0.9995, gamma=0.99, epsilon=0.5, num_episodes=10000, display=False):
    num_states = environment.observation_space.n
    num_actions = environment.action_space.n
    q_array = np.zeros([num_states, num_actions])   # Initial Q table

    for episode_index in range(num_episodes):
        if display and episode_index % DISPLAY_EVERY_N_EPISODES == 0:
            qtable_display(q_array, title="Q table", cbar=True)
        else:
            print('.', end="")
        q_array_history.append(q_array.copy())
        alpha_history.append(alpha)

        # Update alpha
        if alpha_factor is not None:
            alpha = alpha * alpha_factor
        
        # TODO...
        S = environment.reset()
        A = epsilon_greedy_policy(S, q_array, epsilon)
        while True:
            S_new, R, done, _ = environment.step(A)
            A_new = epsilon_greedy_policy(S_new, q_array, epsilon)
            q_array[S, A] += alpha * (R + gamma * q_array[S_new, A_new] - q_array[S, A])
            S = S_new
            A = A_new
            if done:
                break

    return q_array

In [None]:
class Agent:
    
    def __init__(self, lr=.1, epsilon=0.3):
        self.Q_table = defaultdict(dict)
        self.lr = lr
        self.epsilon = epsilon
        
    def 
        

In [105]:
np.argwhere(np.array([[1, 0], [1, 0]]) == 0)

array([[0, 1],
       [1, 1]], dtype=int64)

In [114]:
np.random.choice([[1, 0], [1, 0]])

ValueError: a must be 1-dimensional

In [123]:
s = np.array([[1, 2], [3, 4]])

In [129]:
s[np.argwhere(s == 1)]

array([[[1, 2],
        [1, 2]]])

In [130]:
tuple(s)

(array([1, 2]), array([3, 4]))

In [152]:
class ActionSpace:
    
    def __init__(self, board):
        self.legal_moves = np.argwhere(board == 0)
    
    def sample(self):
        size = self.legal_moves.shape[0]
        random_idx = np.random.randint(size)
        return tuple(self.legal_moves[random_idx])

In [162]:
class Game(gym.Env):
    
    def __init__(self, player1, player2, size=3, n_dim=2):
        assert(type(n_dim) is int and n_dim >= 2), "wrong n_dim"
        assert(type(size) is int and size >= 2), "wrong size"
        self.n_dim = n_dim
        self.size = size
        self.player1 = player1
        self.player2 = player2
        self.player1_round = True
        self.board = np.zeros([size]*n_dim, dtype=int)
        self.current_score = (0, 0)
        self.action_space = ActionSpace(self.board)
        super(Game, self).__init__()
    
    
    def is_available(self, position):
        return self.board[position] == 0
    
    def is_done(self):
        return not 0 in self.board
    
    def reset(self):
        self.board *= 0
        self.action_space =  ActionSpace(self.board)
        
    
    def step(self, position):
        assert self.is_available(position), "Move is invalid"
        self.board[position] = 1 if self.player1_round else -1
        self.action_space = ActionSpace(self.board)
        self.player1_round = not self.player1_round

        score_p1, score_p2 = self.score()
        score_p1_diff, score_p2_diff =  score_p1 - self.current_score[0], score_p2 - self.current_score[1]
        self.current_score = (score_p1, score_p2)

        return self.board, \
        score_p1_diff * self.player1_round + score_p2_diff * (not self.player1_round), \
        self.is_done(), \
        None
                
    
    def render(self):
        print(self.board)
        
    
    def score(self):
        score_p1 = 0
        score_p2 = 0
        
        def slice_to_mask(L):
            """
            Enables to use slicing operator like array[x, y, :, z] with choosing the position
            of the symbol ':' (represented with a -1 instead). For example L can be equal to
            [0, 0, -1, 0] if we want to access self.board[0, 0, :, 0]
            """
            mask = np.zeros([self.size] * self.n_dim, dtype=bool)
            dim = L.index(-1)
            for tile in range(self.size):
                L[dim] = tile
                mask[tuple(L)] = True
            return mask
        
        # vertical and horizontal axis
        all_axis = []
        for d in range(self.size ** self.n_dim):
            all_axis.append([(d // self.size**k) % self.size for k in range(self.n_dim)[::-1]])
            # example in 3D case with size 3 :
            # all_axis = [ [i, j, k] for i = 0, 1, 2 for j = 0, 1, 2 for k = 0, 1, 2 ]
        for d in range(self.n_dim):
            d_axis = np.array(all_axis)
            d_axis[:, d] = -1
            d_axis = np.unique(d_axis, axis=0)
            for axis in d_axis:
                space_mask = slice_to_mask(list(axis))
                in_game_axis = self.board[space_mask]
                axis_value = in_game_axis.sum().item()
                if axis_value == self.size:
                    score_p1 += 1
                elif axis_value == -self.size:
                    score_p2 += 1
        
        # diagonal axis
        diag = np.array([range(self.size)]).T
        antidiag = np.array([range(self.size-1, -1, -1)]).T
        poss_diag = np.array([diag, antidiag])
        poss_index = list(range(self.size))
        coords_to_check = set()
        for dof in range(self.n_dim-2, -1, -1):
            dof_fc = self.n_dim - dof
            cpt = 0
            for fc in subsets(poss_diag, dof_fc, repetition=True):
                if cpt == int(dof_fc / 2) + 1:
                    break
                cpt += 1
                frozen_comp = np.array(fc).reshape((dof_fc, self.size)).T
                if dof > 0:
                    for free_comp in subsets(poss_index, dof, repetition=True):
                        free_comp_array = np.repeat(np.array([free_comp]), self.size, axis=0)
                        coords = np.hstack((free_comp_array, frozen_comp))
                        for perm in multiset_permutations(coords.T.tolist()):
                            perm_coords = [list(i) for i in zip(*perm)]
                            perm_coords.sort()
                            coords_to_check.add(tuple(map(tuple, perm_coords)))
                else:
                    coords = frozen_comp
                    for perm in multiset_permutations(coords.T.tolist()):
                        perm_coords = [list(i) for i in zip(*perm)]
                        perm_coords.sort()
                        coords_to_check.add(tuple(map(tuple, perm_coords)))
                        
        for coords in coords_to_check:
            total = 0
            for tile in coords:
                total += self.board[tile]
            if abs(total) == self.size:
                if total > 0:
                    score_p1 += 1
                else:
                    score_p2 += 1
                
        return score_p1, score_p2

In [163]:
import gym
env = Game(None, None)
for i_episode in range(20):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break
env.close()

[[0 0 0]
 [0 0 0]
 [0 0 0]]
None
[[0 0 0]
 [0 0 1]
 [0 0 0]]
[[0 0 0]
 [0 0 1]
 [0 0 0]]
[[ 0  0  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0  0  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0  1  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0  1  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0  1  0]
 [ 0  0  1]
 [ 0 -1 -1]]
[[ 0  1  0]
 [ 0  0  1]
 [ 0 -1 -1]]
[[ 0  1  1]
 [ 0  0  1]
 [ 0 -1 -1]]
[[ 0  1  1]
 [ 0  0  1]
 [ 0 -1 -1]]
[[ 0  1  1]
 [-1  0  1]
 [ 0 -1 -1]]
[[ 0  1  1]
 [-1  0  1]
 [ 0 -1 -1]]
[[ 0  1  1]
 [-1  0  1]
 [ 1 -1 -1]]
[[ 0  1  1]
 [-1  0  1]
 [ 1 -1 -1]]
[[-1  1  1]
 [-1  0  1]
 [ 1 -1 -1]]
[[-1  1  1]
 [-1  0  1]
 [ 1 -1 -1]]
Episode finished after 9 timesteps
[[0 0 0]
 [0 0 0]
 [0 0 0]]
None
[[ 0 -1  0]
 [ 0  0  0]
 [ 0  0  0]]
[[ 0 -1  0]
 [ 0  0  0]
 [ 0  0  0]]
[[ 0 -1  0]
 [ 0  0  1]
 [ 0  0  0]]
[[ 0 -1  0]
 [ 0  0  1]
 [ 0  0  0]]
[[ 0 -1  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0 -1  0]
 [ 0  0  1]
 [ 0 -1  0]]
[[ 0 -1  0]
 [ 0  1  1]
 [ 0 -1  0]]
[[ 0 -1  0]
 [ 0  1  1]
 [ 0 -1  0]]
[[ 0 -1  0]
 [-1  1  1]
 [ 0 

In [101]:
space = gym.spaces.Box(0, 2, [2], int)
space.sample()


AttributeError: 'property' object has no attribute 'sample'

In [10]:
game = Game(None, None, 3, 3)
game.board[0, 2, 2] = -1
game.board[1, 1, 1] = -1
game.board[2, 0, 0] = 1

In [26]:
game.board

array([[[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0, -1]],

       [[ 0,  0,  0],
        [ 0, -1,  0],
        [ 0,  0,  0]],

       [[ 1,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0]]])

In [7]:
id(game.board)

2201223149168

In [15]:
help(id)

Help on built-in function id in module builtins:

id(obj, /)
    Return the identity of an object.
    
    This is guaranteed to be unique among simultaneously existing objects.
    (CPython uses the object's memory address.)



In [5]:
game = Game(None, None, 3, 9)
game.board[0, 0, 0, 0, 0, 0, 0, 2, 2] = -1
game.board[0, 0, 0, 0, 0, 0, 1, 1, 1] = -1
game.board[0, 0, 0, 0, 0, 0, 2, 0, 0] = -1
game.board[0, 0, 0, 0, 0, 0, 0, 0, 0] = 1
game.board[1, 1, 1, 1, 1, 1, 1, 1, 1] = 1
game.board[2, 2, 2, 2, 2, 2, 2, 2, 2] = 1

In [8]:
%time game.score()

Wall time: 24 s


(1, 1)