In [None]:
import itertools
import numpy as np
import tensorflow as tf
import keras
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D, LeakyReLU, Activation
from keras.layers.normalization import BatchNormalization
from keras.optimizers import SGD

In [None]:
POS_TO_LETTER = 'abcdefghjklmnop'
LETTER_TO_POS = {letter: pos for pos, letter in enumerate(POS_TO_LETTER)}

def to_move(pos):
    return POS_TO_LETTER[pos // 15] + str(pos % 15 + 1)

def to_pos(move):
    return (int(move[1:]) - 1) * 15 +  LETTER_TO_POS[move[0]]

def to_coords(move):
    return int(move[1:]) - 1, LETTER_TO_POS[move[0]]

In [None]:
PATH = "Data/"

fd_train = open(PATH + "train-1.renju")
fd_test = open(PATH + "train-2.renju")

data = []
for line in fd_train:
    data.append(line)

black_states_n = 0
white_states_n = 0

for line in data:
    line = line.split()
    if line[0] == 'white':
        white_states_n += len(line) // 2
    elif line[0] == 'black':
        black_states_n += len(line) // 2 + len(line) % 2

In [None]:
def generator(data, batch_size = 70):
    def augmentation(train_features, train_labels, size, elem, board, label):
        if to_coords(elem)[0] == 0:
            train_features.append(np.copy(np.rot90(board)))
            train_labels.append(np.copy(np.rot90(label)).ravel())
            return 1
        elif to_coords(elem)[1] == 0:
            next_f = np.copy(np.rot90(np.rot90(board)))
            next_l = np.copy(np.rot90(np.rot90(label))).ravel()
            train_features.append(next_f)
            train_labels.append(next_l)
            return 1
        elif to_coords(elem)[0] == 14:
            next_f = np.copy(np.rot90(np.rot90(np.rot90(board))))
            next_l = np.copy(np.rot90(np.rot90(np.rot90(label)))).ravel()
            train_features.append(next_f)
            train_labels.append(next_l)
            return 1
        return 0

    def rand_augmentation(train_features, train_labels, size, board, label):
        num = np.random.randint(1000)
        if num > 700 and num <= 800:
            train_features.append(np.copy(np.rot90(board)))
            train_labels.append(np.copy(np.rot90(label)).ravel())
            return 1
        elif num > 800 and num <= 900:
            next_f = np.copy(np.rot90(np.rot90(board)))
            next_l = np.copy(np.rot90(np.rot90(label))).ravel()
            train_features.append(next_f)
            train_labels.append(next_l)
            return 1
        elif num > 900 and num <= 1000:
            next_f = np.copy(np.rot90(np.rot90(np.rot90(board))))
            next_l = np.copy(np.rot90(np.rot90(np.rot90(label)))).ravel()
            train_features.append(next_f)
            train_labels.append(next_l)
            return 1
        return 0

    size = 0
    train_features = []
    train_labels = []
    while 1:
        for line in data:
            cur = np.zeros((15, 15))
            line = line.split()
            color = 1
            winner = line[0]
            for i in range(1, len(line)):
                label = np.zeros((15, 15))
                label[to_coords(line[i])] = 1
                if color == 1 and winner == 'black':
                    train_features.append(np.copy(cur))
                    train_labels.append(np.copy(label).ravel())
                    size += augmentation(train_features, train_labels, size, line[i], cur, label)
                    size += rand_augmentation(train_features, train_labels, size, cur, label)
                    size += 1
                elif color == -1 and winner == 'white':
                    train_features.append(np.copy(cur))
                    train_labels.append(np.copy(label).ravel())
                    size += augmentation(train_features, train_labels, size, line[i], cur, label)
                    size += rand_augmentation(train_features, train_labels, size, cur, label)
                    size += 1
                cur[to_coords(line[i])[0], to_coords(line[i])[1]] = color
                color = -color
                if size >= batch_size:
                    yield np.array(train_features).reshape((size, 15, 15, 1)),\
                        np.array(train_labels).reshape((size, 225))
                    train_features = []
                    train_labels = []
                    size = 0

In [None]:
def random_pos(line_len, eveness):
    if eveness == 0:
        res = np.random.randint(0, line_len * 1000)
        res = res % (line_len // 2) + 1
        return 2 * res

    if line_len % 2 == 0:
        return 2 * np.random.randint(0, line_len * 1000) % (line_len // 2) + 1

    return 2 * np.random.randint(0, line_len * 1000) % (line_len + 1 // 2) + 1


def shuffle_gen(data, batch_size = 70):
    size = 0
    train_features = []
    train_labels = []
    while 1:
        for line in data:
            cur = np.zeros((15, 15))
            line = line.split()
            if len(line) <= 3:
                continue
            winner = line[0]
            pos = 0
            if winner == 'black':
                pos = random_pos(len(line) - 1, 1)
            elif winner == 'white':
                pos = random_pos(len(line) - 1, 0)
            else:
                continue
            color = 1
            for i in range(1, pos):
                elem = line[i]
                cur[to_coords(elem)[0], to_coords(elem)[1]] = color
                color = -color

            label = np.zeros((15, 15))
            label[to_coords(line[pos])] = 1
            for _ in range(4):
                train_features.append(np.copy(cur))
                train_labels.append(np.copy(label).ravel())
                cur = np.rot90(cur)
                label = np.rot90(label)

            size += 1
        
            if size >= batch_size:
                yield np.array(train_features).reshape((batch_size * 4, 15, 15, 1)),\
                    np.array(train_labels).reshape((batch_size * 4, 225))
                train_features = []
                train_labels = []
                size = 0

In [None]:
def policy(data, states_n):
    model = Sequential()
    model.add(Conv2D(16, (2, 2), padding='same', input_shape=(15, 15, 1)))
    model.add(LeakyReLU())
    model.add(Conv2D(16, (2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(16, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(128, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(128, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(128, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(1, (1, 1), padding='same'))
    model.add(LeakyReLU())
    model.add(Flatten())
    model.add(Activation('softmax'))

    model.compile(loss='categorical_crossentropy', optimizer = 'adam', metrics=['accuracy'])
    model.summary()

    for i in range(5):
        batch_sz = 100 + 50 * i
        model.fit_generator(generator(data, batch_sz), steps_per_epoch = 1.3 * states_n // batch_sz, epochs=1)
        model.save('rollout' + str(i) + '.h5')

    return model

In [None]:
def rollout(data, states_n):
    model = Sequential()
    model.add(Conv2D(16, (2, 2), padding='same', input_shape=(15, 15, 1)))
    model.add(LeakyReLU())
    model.add(Conv2D(16, (2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(16, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(32, (3, 3), padding='same'))
    model.add(LeakyReLU())

    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(64, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(128, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(128, (3, 3), padding='same'))
    model.add(LeakyReLU())
    model.add(BatchNormalization(axis=3))

    model.add(Conv2D(1, (1, 1), padding='same'))
    model.add(LeakyReLU())
    model.add(Flatten())
    model.add(Activation('softmax'))

    model.compile(loss='categorical_crossentropy', optimizer = 'adam', metrics=['accuracy'])
    model.summary()

    for i in range(5):
        batch_sz = 125
        model.fit_generator(generator(data, batch_sz), steps_per_epoch = 1.3 * states_n // batch_sz, epochs=1)
        model.save('rollout' + str(i) + '.h5')

    return model

In [None]:
policy_net = policy(data, black_states_n + white_states_n)
rollout_net = rollout(data, black_states_n + white_states_n)