In [1]:
from copy import deepcopy

import numpy as np

from osero_learn import learn
from BitBoard import osero

In [2]:
PLAY_WAY = deepcopy(osero.PLAY_WAY)
del PLAY_WAY["human"]
PLAY_WAY = PLAY_WAY.values()

eva = [[
     1.0, -0.6,  0.6,  0.4,  0.4,  0.6, -0.6,  1.0,
    -0.6, -0.8,  0.0,  0.0,  0.0,  0.0, -0.8, -0.6,
     0.6,  0.0,  0.8,  0.6,  0.6,  0.8,  0.0,  0.6,
     0.4,  0.0,  0.6,  0.0,  0.0,  0.6,  0.0,  0.4,
     0.4,  0.0,  0.6,  0.0,  0.0,  0.6,  0.0,  0.4,
     0.6,  0.0,  0.8,  0.6,  0.6,  0.8,  0.0,  0.6,
    -0.6, -0.8,  0.0,  0.0,  0.0,  0.0, -0.8, -0.6,
     1.0, -0.6,  0.6,  0.4,  0.4,  0.6, -0.6,  1.0
] for i in range(2)]

In [3]:
data = []
result = []

turn_vari = [i for i in range(1, 61)]
run = learn(0, 0, turn_vari, eva=eva)

for i in range(10):
    print("\r[" + "#" * (i+1) + " " * (10-i+1) + "]", end="")
    for black in PLAY_WAY:
        for white in PLAY_WAY:
            run.setup()
            run.black_method = black
            run.white_method = white
            run.eva = eva
            data_ele, result_ele = run.play()
            for data_each_turn in data_ele:
                data.append(data_each_turn)
            for result_each_turn in result_ele:
                result.append([result_each_turn])

data = np.array(data).astype(np.float32)
result = np.array(result).astype(np.float32)

print("\r[" + "#" * 10 + "]")

[##########]


In [4]:
from chainer.datasets import TupleDataset
from chainer.datasets import split_dataset_random
from chainer.iterators import SerialIterator

import chainer
import chainer.links as L
import chainer.functions as F

from chainer import optimizers
from chainer.optimizer_hooks import WeightDecay

from chainer.dataset import concat_examples

In [19]:
batch_size = 50

dataset = TupleDataset(data, result)

train_val, test = split_dataset_random(dataset, int(len(dataset) * 0.7), seed=0)
train, valid = split_dataset_random(train_val, int(len(train_val) * 0.7), seed=0)

train_iter = SerialIterator(train, batch_size=batch_size, repeat=True, shuffle=True)

In [20]:
class Net(chainer.Chain):
    def __init__(self, n_in, n_hidden, n_out):
        super().__init__()
        with self.init_scope():
            self.l1 = L.Linear(n_in, n_hidden)
            self.l2 = L.Linear(n_hidden, n_hidden)
            # self.l3 = L.Linear(n_hidden, n_hidden)
            # self.l4 = L.Linear(n_hidden, n_hidden)
            self.l5 = L.Linear(n_hidden, n_out)
    
    def forward(self, x):
        h = F.tanh(self.l1(x))
        h = F.tanh(self.l2(h))
        # h = F.tanh(self.l3(h))
        # h = F.tanh(self.l4(h))
        # h = F.elu(self.l1(x))
        # h = F.elu(self.l2(h))
        # h = F.leaky_relu(self.l1(x), slope=1.0)
        # h = F.leaky_relu(self.l2(h), slope=1.0)
        # h = self.l3(h)
        h = self.l5(h)

        return h

In [21]:
net = Net(len(data[0]), len(data[0]), 1)

optimizer = optimizers.SGD(lr=0.01)
# optimizer = optimizers.SGD(lr=0.02)
optimizer.setup(net)

for param in net.params():
    if param.name != "b":
        param.update_rule.add_hook(WeightDecay(0.00001))

In [22]:
n_epoch = 50

results_train = {
    "MSE": [],
    "MAE": []
}
results_valid = {
    "MSE": [],
    "MAE": []
}

train_iter.reset()

for epoch in range(n_epoch):
    print("\r%d/%d" % (epoch + 1, n_epoch), end="")
    while True:
        train_batch = train_iter.next()

        x_train, t_train = concat_examples(train_batch)

        y_train = net(x_train)
        MSE_train = F.mean_squared_error(y_train, t_train)
        MAE_train = F.mean_absolute_error(y_train, t_train)

        net.zerograds()
        MSE_train.backward()

        optimizer.update()

        if train_iter.is_new_epoch:
            with chainer.using_config("train", False), chainer.using_config("enable_backprop", False):
                x_valid, t_valid = concat_examples(valid)
                y_valid = net(x_valid)
                MSE_valid = F.mean_squared_error(y_valid, t_valid)
                MAE_valid = F.mean_absolute_error(y_valid, t_valid)

            results_train["MSE"].append(MSE_train.array)
            results_train["MAE"].append(MAE_train.array)
            results_valid["MSE"].append(MSE_valid.array)
            results_valid["MAE"].append(MAE_valid.array)

            break

50/50

In [23]:
import matplotlib.pyplot as plt


def plot(train, valid, xlabel, ylabel, fig_name, save_dir):
    fig = plt.figure(figsize=(10, 10))
    plt.plot(train, label="train")
    plt.plot(valid, label="valid")
    plt.legend()
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(fig_name)
    plt.savefig(save_dir + "/" + fig_name)
    plt.clf()
    plt.close()

In [24]:
plot(
    results_train["MSE"],
    results_valid["MSE"],
    "epoch",
    "mean squared error",
    "mean squared error of each epoch",
    "fig"
)

plot(
    results_train["MAE"],
    results_valid["MAE"],
    "epoch",
    "mean absolute error",
    "mean absolute error of each epoch",
    "fig"
)

In [25]:
x_test, t_test = concat_examples(test)
with chainer.using_config("train", False), chainer.using_config("enable_backprop", False):
    y_test = net(x_test)
    MSE_test = F.mean_squared_error(y_test, t_test)
    MAE_test = F.mean_absolute_error(y_test, t_test)

print("test MSE: {:.4f}".format(MSE_test.array))
print("test MAE: {:.4f}".format(MAE_test.array))

test MSE: 371.1285
test MAE: 14.7923
