In [None]:
import numpy as np
from collections import deque
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

import pickle

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [None]:
# Игра крестики-нолики
class TicTacToe:
    def __init__(self, player_1, player_2, board_size=3, win_size=3):
        self.players = {-1: player_1,
                         1: player_2}

        self.wins = {player_1.name: 0,
                     player_2.name: 0}

        self.board_size=board_size
        self.win_size = win_size
        self._kernel = self._create_kernel()


    # Создает ядро свертки для расчета побед
    def _create_kernel(self):
        kernel = np.zeros((2 * self.win_size + 2, self.win_size, self.win_size))
        for i in range(self.win_size):
            kernel[i, i, :] = np.ones(self.win_size)
        for i in range(self.win_size, 2 * self.win_size):
            kernel[i, :, i - self.win_size] = np.ones(self.win_size).T
        kernel[2 * self.win_size] = np.eye(self.win_size)
        kernel[2 * self.win_size + 1] = np.fliplr(np.eye(self.win_size))
        return kernel


    # Проверяет победы для состояний states, в кот. ходы были совершены игроками turns, turn={-1, 1}
    def _test_win(self, state, turn):
        rows, cols, w_size = *state.shape, self.win_size
        expanded_states = np.lib.stride_tricks.as_strided(
            state,
            shape=(rows - w_size + 1, cols - w_size + 1, w_size, w_size),
            strides=(*state.strides, *state.strides),
            writeable=False,
        )
        feature_map = np.einsum('xyij,sij->sxy', expanded_states, self._kernel)
        return -turn * (feature_map == turn * w_size).any().astype(int)


    # Проигрывание нескольких полных эпизодов
    def play(self, num_games=1, visualize=False):
        transitions = []
        for t in range(num_games):
            next_turn = turn = -1
            state = (np.zeros((self.board_size, self.board_size)), turn) # Начальное состояние игры. state = (state2d, turn)
            if visualize:
                self.visualize_state(state, turn)
            while(next_turn != 0):
                state_2d, turn = state
                current_player = self.players[turn]
                action = current_player.get_action(state)
                next_state_2d, next_turn, reward = self.play_turn(state, action)
                transitions.append((turn * state_2d, action, reward, -turn * next_state_2d, next_turn == 0))   #state, action, reward, new_state, done
                if visualize:
                    self.visualize_state((next_state_2d, next_turn), turn)
                if next_turn == 0:
                    if visualize:
                        if (reward == 0): print('Ничья!\n')
                        else: print(f'Победа ({self.players[reward * turn].name})!\n')
                    if reward != 0:
                        self.wins[self.players[reward * turn].name] += 1
                    self.players = {-1: self.players[1], 1: self.players[-1]}
                state = next_state_2d, next_turn
        return transitions


    # Выполнение хода и проверка на некорректный ход (проигрышь) / выигрыш / ничью
    def play_turn(self, state, action): # next_state2d, next_turn, reward
        state2d, turn = state
        next_state2d = state2d.copy()

        # Проверка корректности хода
        if (state2d[(action)] != 0):
            return next_state2d, 0, -1        # Игрок проиграл (# next_turn == 0 => Игра окончена)

        # Совершение хода
        next_state2d[action] = turn

        # Проверка победы
        if self._test_win(next_state2d, turn):
            return next_state2d, 0, 1         # Текущий игрок побеждает (next_turn == 0 => Игра окончена)

        # Проверка ничьи
        if (next_state2d != 0).all():
            return next_state2d, 0, 0         # Ничья (next_turn == 0 => Игра окончена)

        # Инчае, ход следующего игрока
        return next_state2d, -turn, 0         # next_turn == -turn => Смена хода


    # Выводит на экран состояние игры после хода игрока
    @staticmethod
    def visualize_state(next_state, turn):
        next_state2d, next_turn = next_state
        print(f"player {turn}'s turn:")
        if (next_state2d == 0).all() and turn == 0:
            print("[invalid state]\n\n")
        else:
            print(str(next_state2d)
                  .replace(".", "")
                  .replace("[[", "")
                  .replace(" [", "")
                  .replace("]]", "")
                  .replace("]", "")
                  .replace("-0", " .")
                  .replace("0", ".")
                  .replace("-1", " X")
                  .replace("1", "O")
            )


    @staticmethod
    def print_transitions(transitions):
        states, actions, rewards, next_states, dones = zip(*transitions)
        for i in np.arange(len(states)):
            print("\033[31m{}.".format(i + 1), '\033[30m')
            TicTacToe.visualize_state((next_states[i], -1), 1)
            print('\naction = ', actions[i] + np.array([1, 1]), end='\n')
            print('reward = ', rewards[i], end='\n')
            if (dones[i]): print('Игра окончена', end='\n\n')
            else: print('Игра продолжается', end='\n\n')

In [None]:
class Human:
    def __init__(self, name='Human'):
        self.name = name

    def get_action(self, state):
        state2d, turn = state
        print('Введите ваш ход (Строка, столбец)')
        row, col = map(int, input().split())
        while (state2d[row - 1, col - 1] != 0):
            print('Клетка занята!')
            print('Введите ваш ход (Строка, столбец)')
            row, col = map(int, input().split())
        return row - 1, col - 1

In [None]:
class Random:
    def __init__(self, name='Random'):
        self.name = name

    def get_action(self, state):
        state2d, turn = state
        zero_idxs = np.argwhere(state2d == 0)
        return tuple(zero_idxs[np.random.randint(len(zero_idxs))])

In [None]:
class DQNAgent(nn.Module):
    def __init__(self, epsilon=0, name='DQNAgent'):
        super().__init__()

        self.name = name
        self.epsilon = epsilon

        self.n_channels = 3

        self.network = nn.Sequential(
            nn.Conv2d(self.n_channels, 32, kernel_size=(3, 3), padding='same'),
            nn.ReLU(),
            nn.Conv2d(32, 128, kernel_size=(3, 3), padding='same'),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=(3, 3), padding='same'),
            nn.ReLU(),
            nn.Conv2d(128, 1, kernel_size=(3, 3), padding='same')
        )

    def forward(self, x):
        x = torch.stack([x == 1, x == -1, x == 0], axis=1).float()
        return self.network(x).squeeze(1)

    def greedy_action(self, state, device=device):
        state2d, turn = state
        state_t = torch.FloatTensor(turn * state2d).unsqueeze(0).to(device)
        q_values = self.forward(state_t).squeeze(0).detach().cpu().numpy()
        # q_values[state2d != 0] = -float("Inf") # Маскирование
        return np.unravel_index(q_values.argmax(), q_values.shape)

    def random_action(self, state):
        state2d, turn = state
        zero_idxs = np.argwhere(state2d == 0)
        return tuple(zero_idxs[np.random.randint(len(zero_idxs))])

    def get_action(self, state):
        if random.random() < self.epsilon:
            action = self.random_action(state)
        else:
            action = self.greedy_action(state)
        return action

In [None]:
class ReplayBuffer(object):
    def __init__(self, size):
        self._storage = deque(maxlen=size)

    def __len__(self):
        return len(self._storage)

    def add(self, transition):
        self._storage.append(transition)

    def sample(self, batch_size):
        batch = random.sample(self._storage, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)

In [None]:
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed);

In [None]:
board_size = 5
win_size = 4

In [None]:
# Гиперпараметры метода DQN

batch_size = 128
total_steps = 30_000

decay_steps = 18_000
init_epsilon = 1
final_epsilon = 0.02

loss_freq = 400
refresh_target_network_freq = 500

eval_freq = 1000
n_eval_games = 50

max_grad_norm = 50

gamma = 1.0

In [None]:
agent = DQNAgent(init_epsilon).to(device)

target_network = DQNAgent(init_epsilon).to(device)
target_network.load_state_dict(agent.state_dict())

optimizer = torch.optim.Adam(agent.parameters(), lr=1e-4)
exp_replay = ReplayBuffer(16_000)

In [None]:
sum(p.numel() for p in agent.parameters() if p.requires_grad)

186625

In [None]:
# Возвращает temporal difference loss
def compute_td_loss(states, actions, rewards, next_states, dones,
                    agent, target_network, gamma=0.9, device=device):

    states = torch.tensor(states, device=device, dtype=torch.float32)                # shape: [batch_size, state_dim]
    actions = torch.tensor(actions, device=device, dtype=torch.int64)                # shape: [batch_size]
    rewards = torch.tensor(rewards, device=device, dtype=torch.float32)              # shape: [batch_size]
    next_states = torch.tensor(next_states, device=device, dtype=torch.float32)      # shape: [batch_size, state_dim]
    dones = torch.tensor(dones, device=device, dtype=torch.int64)                    # shape: [batch_size]

    predicted_qvalues = agent(states)                                                # shape: [batch_size, n_actions]
    predicted_next_qvalues = target_network(next_states)                             # shape: [batch_size, n_actions]
    predicted_qvalues_for_actions = predicted_qvalues[range(len(actions)), actions[:, 0], actions[:, 1]]  # shape: [batch_size]
    next_state_values = predicted_next_qvalues.view(dones.shape[0], -1).max(axis=1).values
    target_qvalues_for_actions = rewards - (1 - dones) * gamma * next_state_values
    return torch.mean((predicted_qvalues_for_actions - target_qvalues_for_actions.detach()) ** 2)  #loss

# Рассчитывает epsilon на текущем шаге step
def linear_decay(init_epsilon, final_epsilon, step, decay_steps):
    return max(init_epsilon - step * (init_epsilon - final_epsilon) / decay_steps, final_epsilon)

# Обучение

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
game = TicTacToe(agent, Random(), board_size=board_size, win_size=win_size)

In [None]:
PATH = f'/content/drive/MyDrive/TicTacToe_2024/TicTacToe'

loss = None

loss_values = []
reward_values = []

In [None]:
with open('out.txt', 'w') as f:
  for t in range(total_steps):
    print(f't = {t}. Ход {game.players[-1].name}', file=f)

    state = (np.zeros((board_size, board_size)), -1) # Начальное состояние игры. state = (state_2d, turn)
    turn = next_turn = -1

    while(next_turn != 0):
        current_player = game.players[turn]
        if current_player.name == 'DQNAgent':
            agent.epsilon = linear_decay(init_epsilon, final_epsilon, t, decay_steps)

        action = agent.get_action(state)
        print(action, file=f)
        next_state_2d, next_turn, reward = game.play_turn(state, action)

        if next_turn == 0:
            if (reward == 0): print('Ничья!\n', file=f)
            else: print(f'Победа ({game.players[reward * turn].name})!\n', file=f)
            game.players = {-1: game.players[1], 1: game.players[-1]}

        state_2d, turn = state
        exp_replay.add((turn * state_2d, action, reward, next_turn * next_state_2d, next_turn == 0)) #state, action, reward, new_state, done

        # Обучение на минибатче
        if len(exp_replay) >= batch_size:
            states, actions, rewards, next_states, dones = exp_replay.sample(batch_size)
            loss = compute_td_loss(states, actions, rewards, next_states, dones, agent, target_network)
            loss.backward()
            grad_norm = nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
            optimizer.step()
            optimizer.zero_grad()

        state = next_state_2d, next_turn

    # Каждые refresh_target_network_freq обновляются веса target сети
    if t % refresh_target_network_freq == 0:
        target_network.load_state_dict(agent.state_dict())

    # Вывод лосса с заданной частотой
    if t % loss_freq == 0 and t > 0:
        loss_values.append(loss)
        print(f"t = {str(t):5}\t loss = {loss}\t eps = {round(agent.epsilon, 4)}")

    # Вывод награды с заданной частотой
    if t % eval_freq == 0:
        agent.epsilon = 0
        eval_game = TicTacToe(agent, Random(), board_size=board_size, win_size=win_size)
        eval_game.play(n_eval_games)
        mean_reward = (eval_game.wins['DQNAgent'] - eval_game.wins['Random']) / n_eval_games
        reward_values.append(mean_reward)
        print(f"t = {str(t):5}\t reward = {round(mean_reward, 4)}\n")

        torch.save(agent.state_dict(), PATH + f'model_{t}')
        torch.save(optimizer.state_dict(), PATH + f'opt_{t}')

t = 0    	 reward = -0.96

t = 400  	 loss = 0.003499798011034727	 eps = 0.9782
t = 800  	 loss = 0.006664647720754147	 eps = 0.9564
t = 1000 	 reward = 0.56

t = 1200 	 loss = 0.010979915037751198	 eps = 0.9347
t = 1600 	 loss = 0.019739609211683273	 eps = 0.9129
t = 2000 	 loss = 0.007432692684233189	 eps = 0.8911
t = 2000 	 reward = 0.8

t = 2400 	 loss = 0.00967826135456562	 eps = 0.8693
t = 2800 	 loss = 0.007575429044663906	 eps = 0.8476
t = 3000 	 reward = 0.84

t = 3200 	 loss = 0.014437269419431686	 eps = 0.8258
t = 3600 	 loss = 0.010891581885516644	 eps = 0.804
t = 4000 	 loss = 0.00514253880828619	 eps = 0.7822
t = 4000 	 reward = 0.88

t = 4400 	 loss = 0.005044175777584314	 eps = 0.7604
t = 4800 	 loss = 0.003672633785754442	 eps = 0.7387
t = 5000 	 reward = 0.88

t = 5200 	 loss = 0.010673675686120987	 eps = 0.7169
t = 5600 	 loss = 0.009115155786275864	 eps = 0.6951
t = 6000 	 loss = 0.00799708440899849	 eps = 0.6733
t = 6000 	 reward = 0.84

t = 6400 	 loss = 0.0060960

In [None]:
with open('out2.txt', 'w') as f:
  for t in range(30_000, 60_000):
    print(f't = {t}. Ход {game.players[-1].name}', file=f)

    state = (np.zeros((board_size, board_size)), -1) # Начальное состояние игры. state = (state_2d, turn)
    turn = next_turn = -1

    while(next_turn != 0):
        current_player = game.players[turn]
        if current_player.name == 'DQNAgent':
            agent.epsilon = linear_decay(init_epsilon, final_epsilon, t, decay_steps)

        action = agent.get_action(state)
        print(action, file=f)
        next_state_2d, next_turn, reward = game.play_turn(state, action)

        if next_turn == 0:
            if (reward == 0): print('Ничья!\n', file=f)
            else: print(f'Победа ({game.players[reward * turn].name})!\n', file=f)
            game.players = {-1: game.players[1], 1: game.players[-1]}

        state_2d, turn = state
        exp_replay.add((turn * state_2d, action, reward, next_turn * next_state_2d, next_turn == 0)) #state, action, reward, new_state, done

        # Обучение на минибатче
        if len(exp_replay) >= batch_size:
            states, actions, rewards, next_states, dones = exp_replay.sample(batch_size)
            loss = compute_td_loss(states, actions, rewards, next_states, dones, agent, target_network)
            loss.backward()
            grad_norm = nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
            optimizer.step()
            optimizer.zero_grad()

        state = next_state_2d, next_turn

    # Каждые refresh_target_network_freq обновляются веса target сети
    if t % refresh_target_network_freq == 0:
        target_network.load_state_dict(agent.state_dict())

    # Вывод лосса с заданной частотой
    if t % loss_freq == 0 and t > 0:
        loss_values.append(loss)
        print(f"t = {str(t):5}\t loss = {loss}\t eps = {round(agent.epsilon, 4)}")

    # Вывод награды с заданной частотой
    if t % eval_freq == 0:
        agent.epsilon = 0
        eval_game = TicTacToe(agent, Random(), board_size=board_size, win_size=win_size)
        eval_game.play(n_eval_games)
        mean_reward = (eval_game.wins['DQNAgent'] - eval_game.wins['Random']) / n_eval_games
        reward_values.append(mean_reward)
        print(f"t = {str(t):5}\t reward = {round(mean_reward, 4)}\n")

        torch.save(agent.state_dict(), PATH + f'model_{t}')
        torch.save(optimizer.state_dict(), PATH + f'opt_{t}')

t = 30000	 loss = 0.00037415343103930354	 eps = 0.02
t = 30000	 reward = 1.0

t = 30400	 loss = 0.000154425113578327	 eps = 0.02
t = 30800	 loss = 9.560085163684562e-05	 eps = 0.02
t = 31000	 reward = 0.88

t = 31200	 loss = 6.0891059547429904e-05	 eps = 0.02
t = 31600	 loss = 0.0005621250020340085	 eps = 0.02
t = 32000	 loss = 0.0004982273676432669	 eps = 0.02
t = 32000	 reward = 0.94

t = 32400	 loss = 0.00011836771591333672	 eps = 0.02
t = 32800	 loss = 0.0003899220610037446	 eps = 0.02
t = 33000	 reward = 1.0

t = 33200	 loss = 5.6718930864008144e-05	 eps = 0.02
t = 33600	 loss = 0.00036711295251734555	 eps = 0.02
t = 34000	 loss = 0.00025227334117516875	 eps = 0.02
t = 34000	 reward = 1.0

t = 34400	 loss = 0.00024430619669146836	 eps = 0.02
t = 34800	 loss = 0.00016894566942937672	 eps = 0.02
t = 35000	 reward = 0.96

t = 35200	 loss = 0.004832028411328793	 eps = 0.02
t = 35600	 loss = 0.0016375058330595493	 eps = 0.02
t = 36000	 loss = 4.0300168620888144e-05	 eps = 0.02
t = 3600

In [None]:
state2d = torch.tensor(np.array(
    [[[0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0],
      [0, 0, 0, 0, 0]]]
)).to(device)

q_values = agent(state2d).squeeze(0).detach().cpu().numpy()
        # q_values[state2d != 0] = -float("Inf") # Маскирование
np.unravel_index(q_values.argmax(), q_values.shape)

(2, 2)

In [None]:
q_values

array([[-0.08061364, -0.05459422, -0.02210075, -0.04188852, -0.0907537 ],
       [-0.01836986,  0.00618161,  0.08391692,  0.05746449, -0.09141948],
       [-0.05466395,  0.02869976,  0.10814499,  0.08277152, -0.11566573],
       [-0.03168222,  0.08481603,  0.04625217,  0.06391506, -0.1531809 ],
       [-0.07397533, -0.01455764,  0.02738114, -0.03418991, -0.01185933]],
      dtype=float32)

In [None]:
agent.load_state_dict(torch.load(PATH + 'model_14000', map_location=torch.device('cpu')))
#agent.load_state_dict(torch.load(PATH + 'model_50000'))

<All keys matched successfully>

In [None]:
agent.epsilon = 0
test_game = TicTacToe(agent, Human(), board_size=board_size, win_size=win_size)
test_game.play(5, True)

player -1's turn:
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
player -1's turn:
 .  .  .  .  .
 .  .  .  .  .
 .  .  X  .  .
 .  .  .  .  .
 .  .  .  .  .
Введите ваш ход (Строка, столбец)
2 2
player 1's turn:
 .  .  .  .  .
 .  O  .  .  .
 .  .  X  .  .
 .  .  .  .  .
 .  .  .  .  .
player -1's turn:
 .  .  .  .  .
 .  O  X  .  .
 .  .  X  .  .
 .  .  .  .  .
 .  .  .  .  .
Введите ваш ход (Строка, столбец)
3 4
player 1's turn:
 .  .  .  .  .
 .  O  X  .  .
 .  .  X  O  .
 .  .  .  .  .
 .  .  .  .  .
player -1's turn:
 .  .  .  .  .
 .  O  X  .  .
 .  .  X  O  .
 .  .  X  .  .
 .  .  .  .  .
Введите ваш ход (Строка, столбец)
5 3
player 1's turn:
 .  .  .  .  .
 .  O  X  .  .
 .  .  X  O  .
 .  .  X  .  .
 .  .  O  .  .
player -1's turn:
 .  .  X  .  .
 .  O  X  .  .
 .  .  X  O  .
 .  .  X  .  .
 .  .  O  .  .
Победа (DQNAgent)!

player -1's turn:
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
Введите ваш ход (Строка, столбец)
1 1
player -1's turn:
 X  .  .  .  .
 .  .  .  .

In [None]:
1#Соранение списки rewards & losses

with open(f'{PATH}data.pickle', 'wb') as f:
   pickle.dump((reward_values, loss_values), f)

In [None]:
torch.save(agent.state_dict(), PATH + f'30000_model')
torch.save(optimizer.state_dict(), PATH + f'30000_opt')