In [130]:
import gym
import numpy as np
from itertools import product

from tqdm.notebook import tqdm

import copy

In [166]:
N_ROWS, N_COLS, N_WIN = 3, 3, 3

class PI:
    def __init__(self, Q, epsilon, alpha, gamma):
        self.Q = Q
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.last_q = 0
        self.last_state = None
        self.last_action = None
        
    def getActionGreedy(self, s, n=None):
        return np.argmax(Q[s])
    
    def get_epsillon_greedy(self, state, possible_moves, action_to_int):
        if (np.random.random() > self.epsilon):
            action = self.getActionGreedy(state)
        else:
            index = np.random.randint(0, high=possible_moves.shape[0])
            action = action_to_int(possible_moves[index])
        
        self.last_action = action
        self.last_state = state
        self.last_q = self.Q[self.last_state][self.last_action]
        
        return action
    
    def update_Q(self, reward, state):
        if self.last_state is not None:
            self.Q[self.last_state][self.last_action] = self.last_q + self.alpha * (reward + self.gamma * np.max(Q[state]) - self.last_q)
    

class TicTacToe(gym.Env):
    def __init__(self, n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN):
        self.n_rows = n_rows
        self.n_cols = n_cols
        self.n_win = n_win

        self.board = np.zeros((self.n_rows, self.n_cols), dtype=int)
        self.gameOver = False
        self.boardHash = None
        # ход первого игрока
        self.curTurn = 1
        self.emptySpaces = None
        
        self.reset()

    def getEmptySpaces(self):
        if self.emptySpaces is None:
            res = np.where(self.board == 0)
            self.emptySpaces = np.array([ (i, j) for i,j in zip(res[0], res[1]) ])
        return self.emptySpaces

    def makeMove(self, player, i, j):
        self.board[i, j] = player
        self.emptySpaces = None
        self.boardHash = None

    def getHash(self):
        if self.boardHash is None:
            self.boardHash = ''.join(['%s' % (x+1) for x in self.board.reshape(self.n_rows * self.n_cols)])
        return self.boardHash

    def isTerminal(self):
        # проверим, не закончилась ли игра
        cur_marks, cur_p = np.where(self.board == self.curTurn), self.curTurn
        for i,j in zip(cur_marks[0], cur_marks[1]):
#             print((i,j))
            win = False
            if i <= self.n_rows - self.n_win:
                if np.all(self.board[i:i+self.n_win, j] == cur_p):
                    win = True
            if not win:
                if j <= self.n_cols - self.n_win:
                    if np.all(self.board[i,j:j+self.n_win] == cur_p):
                        win = True
            if not win:
                if i <= self.n_rows - self.n_win and j <= self.n_cols - self.n_win:
                    if np.all(np.array([ self.board[i+k,j+k] == cur_p for k in range(self.n_win) ])):
                        win = True
            if not win:
                if i <= self.n_rows - self.n_win and j >= self.n_win-1:
                    if np.all(np.array([ self.board[i+k,j-k] == cur_p for k in range(self.n_win) ])):
                        win = True
            if win:
                self.gameOver = True
                return self.curTurn

        if len(self.getEmptySpaces()) == 0:
            self.gameOver = True
            return 0

        self.gameOver = False
        return None

    def printBoard(self):
        for i in range(0, self.n_rows):
            print('----'*(self.n_cols)+'-')
            out = '| '
            for j in range(0, self.n_cols):
                if self.board[i, j] == 1:
                    token = 'x'
                if self.board[i, j] == -1:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('----'*(self.n_cols)+'-')

    def getState(self):
        return (self.getHash(), self.getEmptySpaces(), self.curTurn)

    def action_from_int(self, action_int):
        return ( int(action_int / self.n_cols), int(action_int % self.n_cols))

    def int_from_action(self, action):
        return action[0] * self.n_cols + action[1]
    
    def step(self, action):
        if self.board[action[0], action[1]] != 0:
            return self.getState(), -10, True, {}
        self.makeMove(self.curTurn, action[0], action[1])
        reward = self.isTerminal()
        self.curTurn = -self.curTurn
        return self.getState(), 0 if reward is None else reward, reward is not None, {}

    def reset(self):
        self.board = np.zeros((self.n_rows, self.n_cols), dtype=int)
        self.boardHash = None
        self.gameOver = False
        self.emptySpaces = None
        self.curTurn = 1
        
    def get_random_Q(self, verbose=False):
        Q = {}
        iter_object = product('012', repeat=self.n_cols * self.n_rows)
        if verbose:
            iter_object = tqdm(iter_object)
            
        for s in iter_object:
            s = ''.join(s)
            Q[s] = []
            for i in s:
                if i == '1':
                    Q[s].append(np.random.random() * 0.5)
                else:
                    Q[s].append(float('-inf'))
        return Q

In [50]:
env = TicTacToe(n_rows=3, n_cols=3, n_win=3)
env.printBoard()

-------------
|   |   |   | 
-------------
|   |   |   | 
-------------
|   |   |   | 
-------------


In [55]:
obs, reward, done, info = env.step((2,2))
state, possible_moves, cur_turn = obs
print('reward:', reward, 'cur_turn:', cur_turn)
env.printBoard()

reward: 1 cur_turn: -1
-------------
| x |   |   | 
-------------
| o | x |   | 
-------------
| o |   | x | 
-------------


In [159]:
def Q_learning_episode(env, pi1=None, pi2=None):
    assert pi1 != None or pi2 != None
    
    env.reset()
    state, possible_moves, cur_turn = env.getState()
    
    done = False
    while not done:
        if cur_turn == 1:
            if pi1 is None:
                index = np.random.randint(0, high=possible_moves.shape[0])
                action = env.int_from_action(possible_moves[index])
            else:
                action = pi1.get_epsillon_greedy(state, possible_moves, env.int_from_action)
        else:
            if pi2 is None:
                index = np.random.randint(0, high=possible_moves.shape[0])
                action = env.int_from_action(possible_moves[index])
            else:
                action = pi2.get_epsillon_greedy(state, possible_moves, env.int_from_action)
        
        obs, reward, done, info = env.step(env.action_from_int(action)) 
        assert reward != -10
        
        if reward == 0:
            if cur_turn == 1:
                if pi2 is not None:
                    pi2.update_Q(reward, state)
            else:
                if pi1 is not None:
                    pi1.update_Q(reward, state)
        else:
            if pi1 is not None:
                pi1.update_Q(reward * cur_turn, state)
            if pi2 is not None:
                pi2.update_Q(reward * -cur_turn, state)
            
        state, possible_moves, cur_turn = obs
        
    return reward
        
def play_game(env, pi1, pi2, verbose=False):
    env.reset()
    state, possible_moves, cur_turn = env.getState()

    done = False
    while not done:

        if cur_turn == 1:
            action = pi1.getActionGreedy(state)
        else:
            action = pi2.getActionGreedy(state)

        obs, reward, done, info = env.step(env.action_from_int(action))
        state, possible_moves, cur_turn = obs
        
        if verbose:
            env.printBoard()
            print(env.getHash())
        
        assert reward != -10
        
    return reward
        
def eval_players(env, pi1, pi2, n=10000):
    pi1_wins = 0
    pi2_wins = 0
    draws = 0
    for _ in range(n):
        reward = play_game(env, pi1, pi2)
            
        if reward == 1:
            pi1_wins += 1
        elif reward == -1:
            pi2_wins += 1
        else:
            draws += 1
        
    return pi1_wins / n, pi2_wins / n, draws / n

In [167]:
env = TicTacToe(n_rows=3, n_cols=3, n_win=3)
n = 20 * 1000
epsillon = 0.1
gamma = 0.9
alpha = 0.3
verbose_rate = 0.1
n_test = 10000

Q = env.get_random_Q()
pi1 = PI(copy.deepcopy(Q), epsillon, alpha, gamma)
pi2 = PI(copy.deepcopy(Q), epsillon, alpha, gamma)

test_rewards = []
train_rewards = []

for i in tqdm(range(n)):
    pi1_reward = Q_learning_episode(env, pi1=pi1)
    pi2_reward = Q_learning_episode(env, pi2=pi2)
    
    train_rewards.append((pi1_reward, pi2_reward))
    test_rewards.append(eval_players(env, pi1, pi2, n=1))
    
    if i % int(n * verbose_rate) == int(n * verbose_rate) - 1:
        cur_stat = np.asarray(test_rewards[-int(n * verbose_rate):]).mean(axis=0)
        print(f'Cur test stat: p1: {cur_stat[0]}, p2: {cur_stat[1]}, draw: {cur_stat[2]}', flush=True)
        cur_stat = np.asarray(train_rewards[-int(n * verbose_rate):]).mean(axis=0)
        print(f'Cur train stat: p1: {cur_stat[0]}, p2: {cur_stat[1]}', flush=True)

HBox(children=(FloatProgress(value=0.0, max=20000.0), HTML(value='')))

Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.232, p2: 0.333
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.209, p2: 0.3185
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.1965, p2: 0.2865
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.2325, p2: 0.278
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.2005, p2: 0.2965
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.216, p2: 0.2605
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.2235, p2: 0.3175
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.224, p2: 0.2975
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.2105, p2: 0.2755
Cur test stat: p1: 0.0, p2: 0.0, draw: 1.0
Cur train stat: p1: 0.2305, p2: 0.267



In [170]:
pi2.Q['220102111']

[-inf,
 -inf,
 -inf,
 0.30061531676374553,
 -inf,
 -inf,
 0.3209665146735112,
 0.43266186603856965,
 0.1769221997612736]

In [169]:
play_game(env, pi1, pi2, verbose=True)

-------------
|   |   |   | 
-------------
|   |   | x | 
-------------
|   |   |   | 
-------------
111112111
-------------
|   |   | o | 
-------------
|   |   | x | 
-------------
|   |   |   | 
-------------
110112111
-------------
| x |   | o | 
-------------
|   |   | x | 
-------------
|   |   |   | 
-------------
210112111
-------------
| x |   | o | 
-------------
|   | o | x | 
-------------
|   |   |   | 
-------------
210102111
-------------
| x | x | o | 
-------------
|   | o | x | 
-------------
|   |   |   | 
-------------
220102111
-------------
| x | x | o | 
-------------
|   | o | x | 
-------------
|   | o |   | 
-------------
220102101
-------------
| x | x | o | 
-------------
|   | o | x | 
-------------
|   | o | x | 
-------------
220102102
-------------
| x | x | o | 
-------------
| o | o | x | 
-------------
|   | o | x | 
-------------
220002102
-------------
| x | x | o | 
-------------
| o | o | x | 
-------------
| x | o | x | 
-------------
220002202


0

In [99]:
rewards

[(0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0),
 (0.0, 0.0, 1.0)]