In [1]:
# Import modules.
import torch
import random

import numpy as np

from hts import HTS
from network import H_GO
from torch import nn, optim
from collections import deque
from torchsummary import summary
from dataReader import DataReader

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Set training device.
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available.")
else:
    device = torch.device("cpu")
    print("CUDA is not available, using CPU instead.")

CUDA is available.


In [3]:
# Set random seed.
random_seed = 0

random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [4]:
# Load data.
data_reader = DataReader(dir_path="../dataset/10k", load_num=256, train_ratio=0.8, augment=True)

Read raw data completed. (0.13 s)
Clear data completed. (0.00 s)
Convert data completed. (0.01 s)
Augment data completed. (0.03 s)
Make train data completed. (1.94 s)
Split train and test data completed. (316928 train data, 79232 test data).


In [5]:
# set model.
model = H_GO(input_size=8, output_size=361, hidden_dim=64)
model.to(device=device)

H_GO(
  (conv1): Conv2d(8, 64, kernel_size=(3, 3), stride=(1, 1))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (tanh): Tanh()
  (layers): Sequential(
    (0): Block(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
    )
    (1): Block(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
    

In [6]:
# Print model summary.
# summary(model, input_size=(8, 19, 19))

In [7]:
# Set hyperparameters.
epochs = 2
batch_size = 1024
data_num = 5000

In [8]:
# Set optimizer and loss function.
optimizer = optim.RAdam(params=model.parameters())
loss_fu_policy = nn.CrossEntropyLoss()
loss_fu_value = nn.BCELoss()

In [9]:
def supervised_training(epochs: int, model: H_GO, optimizer: optim.Adam, loss_fu_policy: nn.CrossEntropyLoss, loss_fu_value: nn.MSELoss) -> None:
    # Set model to training mode.
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        total_acc_policy = 0
        total_acc_value = 0

        for _ in range(data_num // batch_size):
            # Get training data batch.
            training_batch = data_reader.get_training_batch(batch_size=batch_size, shuffle=True)

            # Convert data to tensor.
            game_data = torch.tensor([x[0] for x in training_batch], dtype=torch.float).to(device=device)
            step = torch.tensor([x[1] for x in training_batch], dtype=torch.long).to(device=device)
            winner = torch.tensor([x[2] for x in training_batch], dtype=torch.float).to(device=device)

            # Get model output.
            policy, value = model(game_data)

            # Calculate loss.
            policy_loss = loss_fu_policy(policy, step)
            value_loss = loss_fu_value(value, winner)
            loss = policy_loss + value_loss

            # Calculate accuracy.
            total_loss += loss.item()
            print(policy.argmax(1))
            total_acc_policy += torch.sum(policy.argmax(1) == step).item()
            total_acc_value += torch.sum(torch.round(value) == winner).item()

            # Update model parameters.
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(
            f"Epoch: {epoch} | "
            f"Loss of model: {total_loss:.3f} | "
            f"Accuracy of policy: {total_acc_policy / data_num * 100:.3f}% | "
            f"Accuracy of value: {total_acc_value / data_num * 100:.3f}%"
        )

    # Set model to evaluation mode.
    model.eval()

In [10]:
supervised_training(epochs=epochs, model=model, optimizer=optimizer, loss_fu_policy=loss_fu_policy, loss_fu_value=loss_fu_value)

  game_data = torch.tensor([x[0] for x in training_batch], dtype=torch.float).to(device=device)


tensor([147, 347, 200,  ..., 347, 192, 267], device='cuda:0')
tensor([  8, 192, 192,  ...,   8,  47,  36], device='cuda:0')
tensor([147, 347, 192,  ..., 236, 323, 222], device='cuda:0')
tensor([ 36, 105, 192,  ...,  36,   8, 355], device='cuda:0')
Epoch: 0 | Loss of model: 26.713 | Accuracy of policy: 0.220% | Accuracy of value: 35.540%
tensor([ 97, 192, 298,  ...,  74,  36, 222], device='cuda:0')
tensor([ 36, 100, 313,  ..., 192, 143, 147], device='cuda:0')
tensor([ 36,  36,  36,  ...,  36, 162,  35], device='cuda:0')
tensor([161, 211, 229,  ...,   8, 147, 355], device='cuda:0')
Epoch: 1 | Loss of model: 26.705 | Accuracy of policy: 0.120% | Accuracy of value: 36.500%


In [11]:
def play(model: H_GO) -> None:
    # Set model to evaluation mode.
    model.eval()

    # Set defualt value.
    player = -1
    game_board = np.zeros((19, 19))
    game_queue = deque(maxlen=7)

    for _ in range(7):
        game_queue.append(game_board.copy())

    while True:
        # Get model output.
        best_move = HTS(state=game_queue, depth=4, breadth=4, temperature=0.5, player=player, model=model, device=device).get_best_move()

        print(best_move)

        break

        # Select action.
        # if player == -1:

In [12]:
play(model=model)

192
