Реализуйте нейронную сеть для метода DQN на доске для крестиков-ноликов. Не буду ограничивать фантазию, но кажется, что свёртки 3х3 здесь должны неплохо работать (в том числе обобщаться на доски размера побольше).
Реализуйте DQN с нейронной сетью, обучите стратегии крестиков и ноликов. Замечание: скорее всего, experience replay потребуется сразу же.
Реализуйте Double DQN и/или Dueling DQN.

In [1]:
import gym
from gym import make
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch.optim import Adam
from collections import deque
import random
import os
import copy

In [2]:
N_ROWS, N_COLS, N_WIN = 3, 3, 3

class TicTacToe(gym.Env):
    def __init__(self, n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN):
        self.n_rows = n_rows
        self.n_cols = n_cols
        self.n_win = n_win

        self.board = np.zeros((self.n_rows, self.n_cols), dtype=int)
        self.gameOver = False
        # ход первого игрока
        self.curTurn = 1
        self.emptySpaces = None
        
        self.reset()

    def getEmptySpaces(self):
        if self.emptySpaces is None:
            res = np.where(self.board == 0)
            self.emptySpaces = np.array([ (i, j) for i,j in zip(res[0], res[1]) ])
        return self.emptySpaces

    def makeMove(self, player, i, j):
        self.board[i, j] = player
        self.emptySpaces = None

    def _check_terminal(self, cur_p):
        cur_marks = np.where(self.board == cur_p)
        for i,j in zip(cur_marks[0], cur_marks[1]):
            if i <= self.n_rows - self.n_win:
                if np.all(self.board[i:i+self.n_win, j] == cur_p):
                    return True
            if j <= self.n_cols - self.n_win:
                if np.all(self.board[i,j:j+self.n_win] == cur_p):
                    return True
            if i <= self.n_rows - self.n_win and j <= self.n_cols - self.n_win:
                if np.all(np.array([ self.board[i+k,j+k] == cur_p for k in range(self.n_win) ])):
                    return True
            if i <= self.n_rows - self.n_win and j >= self.n_win-1:
                if np.all(np.array([ self.board[i+k,j-k] == cur_p for k in range(self.n_win) ])):
                    return True
        return False
    
    def isTerminal(self):
        # проверим, не закончилась ли игра
        cur_win = self._check_terminal(self.curTurn)
        if cur_win:
                self.gameOver = True
                return self.curTurn
            
        if len(self.getEmptySpaces()) == 0:
            self.gameOver = True
            return 0

        self.gameOver = False
        return None

    def getWinner(self):
        # фактически запускаем isTerminal два раза для крестиков и ноликов
        if self._check_terminal(1):
            return 1
        if self._check_terminal(-1):
            return -1
        if len(self.getEmptySpaces()) == 0:
            return 0
        return None
    
    def printBoard(self):
        for i in range(0, self.n_rows):
            print('----'*(self.n_cols)+'-')
            out = '| '
            for j in range(0, self.n_cols):
                if self.board[i, j] == 1:
                    token = 'x'
                if self.board[i, j] == -1:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('----'*(self.n_cols)+'-')

    def getState(self):
        return self.board
#         return (self.board, self.getEmptySpaces(), self.curTurn)

    def action_from_int(self, action_int):
        return int(action_int / self.n_cols), int(action_int % self.n_cols)

    def int_from_action(self, action):
        return action[0] * self.n_cols + action[1]
    
    def sample_action(self):
        self.getEmptySpaces()
        idx = np.random.randint(low=0, high=len(self.emptySpaces))
        return self.int_from_action(self.emptySpaces[idx])
    
    def step(self, action):
        action = self.action_from_int(action)
        if self.board[action[0], action[1]] != 0:
            return self.getState(), -10, True, {}, {}
        self.makeMove(self.curTurn, action[0], action[1])
        reward = self.isTerminal()
        self.curTurn = -self.curTurn
        return self.getState(), 0 if reward is None else reward, reward is not None, {}, {}

    def reset(self):
        self.board = np.zeros((self.n_rows, self.n_cols), dtype=int)
        self.gameOver = False
        self.emptySpaces = None
        self.curTurn = 1
        return self.board, {}

In [3]:
class DQN:
    def __init__(self, state_dim, action_dim):
        self.steps = 0
        self.model = NeuralNetwork(state_dim, action_dim)
        self.model_target = copy.deepcopy(self.model)
        self.replay_buffer = deque(maxlen=REPLAY_BUFFER_MAX_LEN)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        self.model_target.to(self.device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=LEARNING_RATE)


    def consume_transition(self, transition):
        self.replay_buffer.append(transition)

    def sample_batch(self):
        states = []
        actions = []
        next_states = []
        rewards = []
        dones = []
        for _ in range(BATCH_SIZE):
            state, action, next_state, reward, done = random.choice(self.replay_buffer)
            states.append(state)
            actions.append(action)
            next_states.append(next_state)
            rewards.append(reward)
            dones.append(done)
            
        return torch.Tensor(np.array(states)[:, np.newaxis, :, :]).to(self.device), \
            torch.Tensor(np.array(actions)).long(), \
            torch.Tensor(np.array(next_states)[:, np.newaxis, :, :]).to(self.device), \
            torch.Tensor(np.array(rewards)).to(self.device), \
            torch.Tensor(np.array(dones)).to(self.device)

    def train_step(self, batch):
        states, actions, next_states, rewards, dones = batch
        out = self.model.forward(states)
        with torch.no_grad():
            out_target = self.model_target.forward(next_states)

        max_a_target = out_target.max(dim=1)[0]
        loss = torch.nn.functional.mse_loss(out[torch.arange(BATCH_SIZE), actions], rewards + GAMMA * max_a_target * (1 - dones).float())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
    def update_target_network(self):
        self.model_target = copy.deepcopy(self.model)

    def act(self, state, actions):
        state = np.array(state)[np.newaxis, np.newaxis, :, :]
        with torch.no_grad():
            out = self.model.forward(torch.Tensor(state).to(self.device))
        out = out.detach().cpu().numpy()
        
        mask = np.zeros(out.shape, dtype=bool)
        mask[:, actions] = 1
        
        out[~mask] = -1e9
        return out.argmax()

    def update(self, transition):
        self.consume_transition(transition)
        if self.steps % STEPS_PER_UPDATE == 0:
            batch = self.sample_batch()
            self.train_step(batch)
        if self.steps % STEPS_PER_TARGET_UPDATE == 0:
            self.update_target_network()
        self.steps += 1

    def save(self):
        torch.save(self.model.state_dict(), "agent.pkl")


def evaluate_policy(agent, episodes=5, crosses=True):
    max_steps = 1000
    env = TicTacToe(n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN)
    returns = []
    for _ in range(episodes):
        done = False
        state, _ = env.reset()
        if not crosses:
            possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
            state, reward, done, _, _ = env.step(agent.act(state, possible_actions))

        total_reward = 0.        
        steps = 0
        while not done and steps < max_steps:
            possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
            state, reward, done, _, _ = env.step(agent.act(state, possible_actions))
            # ход соперника
            if not done:
                state, reward, done, _, _ = env.step(env.sample_action())
                
            total_reward += reward
            steps += 1
        returns.append(total_reward)
    return returns

In [4]:
class NeuralNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(self.__class__, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=hidden_dim, kernel_size=(3, 3)),
            nn.Flatten(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()
        )
        
    def forward(self, inp):       
        out = self.model(inp)
        return out

# Учим крестики

In [5]:
GAMMA = 0.98
INITIAL_STEPS = 1024
TRANSITIONS = 1000
STEPS_PER_UPDATE = 4
STEPS_PER_TARGET_UPDATE = STEPS_PER_UPDATE * 1000
BATCH_SIZE = 128
LEARNING_RATE = 1e-3
REPLAY_BUFFER_MAX_LEN = 1000

SEED = 13
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

env = TicTacToe()
dqn = DQN(state_dim=(N_ROWS, N_COLS), action_dim=N_ROWS * N_COLS)
eps = 0.1
state, _ = env.reset()

for _ in range(INITIAL_STEPS):
    action = env.sample_action()
    
    next_state, reward, done, _, _ = env.step(action)
    # ход ноликов
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.consume_transition((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()[0]
    
for i in range(TRANSITIONS):
    if random.random() < eps:
        action = env.sample_action()
    else:
        possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
        action = dqn.act(state, possible_actions)

    next_state, reward, done, _, _ = env.step(action)
    # ход ноликов
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.update((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()[0]

    if (i + 1) % (TRANSITIONS//100) == 0:
        rewards = evaluate_policy(dqn, 500)
        print(f"Step: {i+1}, Reward mean: {np.mean(rewards)}, Reward std: {np.std(rewards)}")
        dqn.save()

Step: 10, Reward mean: 0.164, Reward std: 0.9365383067445772
Step: 20, Reward mean: 0.188, Reward std: 0.9298688079508851
Step: 30, Reward mean: 0.196, Reward std: 0.8795362414363607
Step: 40, Reward mean: 0.294, Reward std: 0.8553151465980244
Step: 50, Reward mean: 0.288, Reward std: 0.8631662644010133
Step: 60, Reward mean: 0.242, Reward std: 0.8575756526394626
Step: 70, Reward mean: 0.278, Reward std: 0.8250551496718264
Step: 80, Reward mean: 0.336, Reward std: 0.8069101560892638
Step: 90, Reward mean: 0.318, Reward std: 0.8251521071899411
Step: 100, Reward mean: 0.136, Reward std: 0.8611062652193399
Step: 110, Reward mean: 0.114, Reward std: 0.8677580307896897
Step: 120, Reward mean: 0.138, Reward std: 0.8825848401145353
Step: 130, Reward mean: 0.17, Reward std: 0.8491760712596652
Step: 140, Reward mean: 0.182, Reward std: 0.8560817717951948
Step: 150, Reward mean: 0.222, Reward std: 0.8465908102501467
Step: 160, Reward mean: 0.214, Reward std: 0.8626725914273619
Step: 170, Reward 

# Учим нолики

In [6]:
GAMMA = 0.98
INITIAL_STEPS = 1024
TRANSITIONS = 2000
STEPS_PER_UPDATE = 4
STEPS_PER_TARGET_UPDATE = STEPS_PER_UPDATE * 1000
BATCH_SIZE = 128
LEARNING_RATE = 1e-3
REPLAY_BUFFER_MAX_LEN = 1000

SEED = 13
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

env = TicTacToe()
dqn = DQN(state_dim=(N_ROWS, N_COLS), action_dim=N_ROWS * N_COLS)
eps = 0.1
env.reset()

state, reward, done, _, _ = env.step(env.sample_action())
for _ in range(INITIAL_STEPS):
    action = env.sample_action()
    
    next_state, reward, done, _, _ = env.step(action)
    # ход крестиков
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.consume_transition((state, action, next_state, -reward, done))
    
    if done:
        env.reset()
        state, reward, done, _, _ = env.step(env.sample_action())
    else:
        state = next_state
        
for i in range(TRANSITIONS):
    if random.random() < eps:
        action = env.sample_action()
    else:
        possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
        action = dqn.act(state, possible_actions)

    next_state, reward, done, _, _ = env.step(action)
    # ход крестиков
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.update((state, action, next_state, -reward, done))

    if done:
        env.reset()
        state, reward, done, _, _ = env.step(env.sample_action())
    else:
        state = next_state

    if (i + 1) % (TRANSITIONS//100) == 0:
        rewards = evaluate_policy(dqn, 500, crosses=False)
        print(f"Step: {i+1}, Reward mean: {np.mean(rewards)}, Reward std: {np.std(rewards)}")
        dqn.save()

Step: 20, Reward mean: 0.344, Reward std: 0.7678958262681208
Step: 40, Reward mean: 0.324, Reward std: 0.9268354762308141
Step: 60, Reward mean: 0.166, Reward std: 0.9687331934026003
Step: 80, Reward mean: 0.238, Reward std: 0.9577870326956821
Step: 100, Reward mean: 0.412, Reward std: 0.8162450612408017
Step: 120, Reward mean: 0.236, Reward std: 0.862730548896931
Step: 140, Reward mean: 0.242, Reward std: 0.8691582134456305
Step: 160, Reward mean: 0.272, Reward std: 0.8209847745238641
Step: 180, Reward mean: 0.18, Reward std: 0.8964373932405988
Step: 200, Reward mean: 0.102, Reward std: 0.91847482273604
Step: 220, Reward mean: 0.26, Reward std: 0.8440379138403676
Step: 240, Reward mean: 0.2, Reward std: 0.8579044235810886
Step: 260, Reward mean: 0.084, Reward std: 0.7516275673496816
Step: 280, Reward mean: 0.044, Reward std: 0.7733459769081364
Step: 300, Reward mean: -0.094, Reward std: 0.8106565240593577
Step: 320, Reward mean: -0.078, Reward std: 0.7693607736296413
Step: 340, Reward

# Учим крестики, 4x4x4

In [7]:
class NeuralNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(self.__class__, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=hidden_dim, kernel_size=(3, 3)),
            nn.Flatten(),
            nn.Linear(4 * hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()
        )
        
    def forward(self, inp):       
        out = self.model(inp)
        return out

In [8]:
N_ROWS, N_COLS, N_WIN = 4, 4, 4

GAMMA = 0.98
INITIAL_STEPS = 1024
TRANSITIONS = 2000
STEPS_PER_UPDATE = 4
STEPS_PER_TARGET_UPDATE = STEPS_PER_UPDATE * 1000
BATCH_SIZE = 128
LEARNING_RATE = 1e-3
REPLAY_BUFFER_MAX_LEN = 1000

SEED = 13
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

env = TicTacToe(n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN)
dqn = DQN(state_dim=(N_ROWS, N_COLS), action_dim=N_ROWS * N_COLS)
eps = 0.1
state, _ = env.reset()

for _ in range(INITIAL_STEPS):
    action = env.sample_action()
    
    next_state, reward, done, _, _ = env.step(action)
    # ход ноликов
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.consume_transition((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()[0]
    
for i in range(TRANSITIONS):
    if random.random() < eps:
        action = env.sample_action()
    else:
        possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
        action = dqn.act(state, possible_actions)

    next_state, reward, done, _, _ = env.step(action)
    # ход ноликов
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.update((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()[0]

    if (i + 1) % (TRANSITIONS//100) == 0:
        rewards = evaluate_policy(dqn, 500)
        print(f"Step: {i+1}, Reward mean: {np.mean(rewards)}, Reward std: {np.std(rewards)}")
        dqn.save()

Step: 20, Reward mean: -0.196, Reward std: 0.6399874998779274
Step: 40, Reward mean: -0.244, Reward std: 0.7017577929741856
Step: 60, Reward mean: -0.134, Reward std: 0.7563359042118786
Step: 80, Reward mean: -0.214, Reward std: 0.7824346618088951
Step: 100, Reward mean: -0.008, Reward std: 0.67818581524535
Step: 120, Reward mean: 0.036, Reward std: 0.7005026766544151
Step: 140, Reward mean: 0.086, Reward std: 0.6860058308790093
Step: 160, Reward mean: 0.072, Reward std: 0.7367604766815331
Step: 180, Reward mean: 0.312, Reward std: 0.8041492398802599
Step: 200, Reward mean: 0.28, Reward std: 0.8009993757800314
Step: 220, Reward mean: 0.218, Reward std: 0.7761932749000084
Step: 240, Reward mean: 0.228, Reward std: 0.7874109473457935
Step: 260, Reward mean: 0.3, Reward std: 0.7549834435270749
Step: 280, Reward mean: 0.36, Reward std: 0.7552483035399683
Step: 300, Reward mean: 0.404, Reward std: 0.7699246716400249
Step: 320, Reward mean: 0.422, Reward std: 0.7509434066559211
Step: 340, Re

# Учим нолики 4x4x4

In [9]:
GAMMA = 0.98
INITIAL_STEPS = 1024
TRANSITIONS = 2000
STEPS_PER_UPDATE = 4
STEPS_PER_TARGET_UPDATE = STEPS_PER_UPDATE * 1000
BATCH_SIZE = 128
LEARNING_RATE = 1e-3
REPLAY_BUFFER_MAX_LEN = 1000

SEED = 13
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)

env = TicTacToe(n_rows=N_ROWS, n_cols=N_COLS, n_win=N_WIN)
dqn = DQN(state_dim=(N_ROWS, N_COLS), action_dim=N_ROWS * N_COLS)
eps = 0.1
env.reset()

state, reward, done, _, _ = env.step(env.sample_action())
for _ in range(INITIAL_STEPS):
    action = env.sample_action()
    
    next_state, reward, done, _, _ = env.step(action)
    # ход крестиков
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.consume_transition((state, action, next_state, -reward, done))
    
    if done:
        env.reset()
        state, reward, done, _, _ = env.step(env.sample_action())
    else:
        state = next_state
        
for i in range(TRANSITIONS):
    if random.random() < eps:
        action = env.sample_action()
    else:
        possible_actions = list(map(env.int_from_action, env.getEmptySpaces()))
        action = dqn.act(state, possible_actions)

    next_state, reward, done, _, _ = env.step(action)
    # ход крестиков
    if not done:
        next_state, reward, done, _, _ = env.step(env.sample_action())
    
    dqn.update((state, action, next_state, -reward, done))

    if done:
        env.reset()
        state, reward, done, _, _ = env.step(env.sample_action())
    else:
        state = next_state

    if (i + 1) % (TRANSITIONS//100) == 0:
        rewards = evaluate_policy(dqn, 500, crosses=False)
        print(f"Step: {i+1}, Reward mean: {np.mean(rewards)}, Reward std: {np.std(rewards)}")
        dqn.save()

Step: 20, Reward mean: 0.052, Reward std: 0.759799973677283
Step: 40, Reward mean: 0.044, Reward std: 0.768156234108661
Step: 60, Reward mean: 0.166, Reward std: 0.7144536374041355
Step: 80, Reward mean: 0.058, Reward std: 0.6592692924746306
Step: 100, Reward mean: -0.074, Reward std: 0.698944919145994
Step: 120, Reward mean: 0.01, Reward std: 0.7758221445666527
Step: 140, Reward mean: -0.006, Reward std: 0.7937027151270178
Step: 160, Reward mean: -0.064, Reward std: 0.7238121303211213
Step: 180, Reward mean: -0.242, Reward std: 0.7179387160475467
Step: 200, Reward mean: -0.07, Reward std: 0.7190966555338718
Step: 220, Reward mean: -0.042, Reward std: 0.764353321442381
Step: 240, Reward mean: -0.134, Reward std: 0.8050118011557346
Step: 260, Reward mean: -0.246, Reward std: 0.8059057016797933
Step: 280, Reward mean: -0.204, Reward std: 0.8236406983630665
Step: 300, Reward mean: -0.18, Reward std: 0.7896834808959854
Step: 320, Reward mean: -0.218, Reward std: 0.7684243619251019
Step: 34