In [None]:
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm

from torch import nn

In [None]:
TARGET_COLUMN = 'target'
METAINFO_COLUMNS = ['stage', 'move_count', 'weight']
METAINFO_COLUMNS_COUNT = len(METAINFO_COLUMNS)
STAGE_INDEX = 0
MOVE_COUNT_INDEX = 1
WEIGHT_INDEX = 2
TARGET_INDEX = -1

STARTPOS_STAGE = 24
INPUT_LAYER_SIZE = 64 * 12

BATCH_SIZE = 65536
TRAIN_SIZE = 0.9

In [None]:
dataset = np.zeros((0, INPUT_LAYER_SIZE + METAINFO_COLUMNS_COUNT + 1), np.int8)
for chunk in tqdm(pd.read_csv('/home/wind-eagle/quirky_data/dataset/01.csv', chunksize=125000)):
    dataset = np.append(dataset, chunk.values.astype(np.int8), axis=0)

np.random.shuffle(dataset)
train_data_size = int(dataset.shape[0] * TRAIN_SIZE)

In [None]:
class ClippedReLU(nn.Module):
    def __init__(self, clip_value: float):
        super(ClippedReLU, self).__init__()
        self.clip_value = clip_value

    def forward(self, x):
        return torch.clamp(x.relu(), max=self.clip_value)

In [None]:
class QNNE(nn.Module):
    def __init__(self):
        super(QNNE, self).__init__()

        self.feature = nn.Sequential()
        self.pawns = nn.Sequential()
        self.main = nn.Sequential()

        self.feature.add_module('linear', nn.Linear(INPUT_LAYER_SIZE, 16))
        self.feature.add_module('clipped_relu1', ClippedReLU(1.0))
        
        self.pawns.add_module('linear', nn.Linear(INPUT_LAYER_SIZE, 16))
        self.pawns.add_module('clipped_relu', ClippedReLU(1.0))

        self.main.add_module('linear', nn.Linear(32, 8))
        self.main.add_module('clipped_relu', ClippedReLU(1.0))
        self.main.add_module('output', nn.Linear(8, 1))
        self.main.add_module('sigmoid', nn.Sigmoid())

    def forward(self, x):
        feature_input = self.feature(x[:, METAINFO_COLUMNS_COUNT:TARGET_INDEX])
        pawns_data = torch.cat((x[:, METAINFO_COLUMNS_COUNT:METAINFO_COLUMNS_COUNT + 64],\
                                torch.zeros(x.shape[0], 64 * 4),\
                                x[:, METAINFO_COLUMNS_COUNT + 64 * 5:METAINFO_COLUMNS_COUNT + 64 * 6],\
                                x[:, METAINFO_COLUMNS_COUNT + 64 * 6:METAINFO_COLUMNS_COUNT + 64 * 7],\
                                torch.zeros(x.shape[0], 64 * 4),\
                                x[:, METAINFO_COLUMNS_COUNT + 64 * 11:METAINFO_COLUMNS_COUNT + 64 * 12]), dim=1)
        pawns_input = self.pawns(pawns_data)
        return self.main(torch.cat((feature_input, pawns_input), dim=1))[:, 0]


In [None]:
def get_loss(model, X, y, w):
    y = (y + 1.0) / 2
    y_pred = model(X)
    assert y_pred.dim() == 1
    loss = torch.sum(w * ((y - y_pred) ** 2)) / torch.sum(w)
    return loss

In [None]:
def train_model(model, opt, iterations):
    history = []
    train_loss = np.inf
    test_loss = np.inf

    progress = tqdm(np.arange(iterations))
    for i in progress:
        indices = np.random.randint(0, train_data_size, BATCH_SIZE)
        X_batch = torch.tensor(dataset[indices], dtype=torch.float32)
        y_batch = torch.tensor(dataset[indices, TARGET_INDEX], dtype=torch.float32)
        w_batch = torch.tensor(dataset[indices, WEIGHT_INDEX], dtype=torch.float32)
        
        loss = get_loss(model, X_batch, y_batch, w_batch)

        loss.backward()
        
        opt.step()
        opt.zero_grad()

        with torch.no_grad():
            model.main.linear.weight.data = torch.clamp(model.main.linear.weight.data, min=-128.0 / 64.0, max=127.0 / 64.0)
            model.main.output.weight.data = torch.clamp(model.main.output.weight.data, min=-128.0 / 64.0, max=127.0 / 64.0)

        history.append(loss.data.numpy())

        if len(history) % 50 == 1:
            X_batch = torch.tensor(dataset[train_data_size:], dtype=torch.float32)
            y_batch = torch.tensor(dataset[train_data_size:, TARGET_INDEX], dtype=torch.float32)
            w_batch = torch.tensor(dataset[train_data_size:, WEIGHT_INDEX], dtype=torch.float32)
            test_loss = get_loss(model, X_batch, y_batch, w_batch)

        train_loss = np.mean(history[-40:])
        progress.set_description(f'Average batch loss = {train_loss:.6f}, test loss = {test_loss:.6f}')

In [None]:
def print_weights(f, arr):
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            print("{0:0.5f}".format(arr[i][j]), ' ', file=f, sep="", end="")

def print_biases(f, arr):
    for i in range(arr.shape[0]):
            print("{0:0.5f}".format(arr[i]), ' ', file=f, sep="", end="")

def print_model(model, name):
    with open(name, 'w') as f:
        feature_transformer_weights = torch.cat((model.feature.linear.weight.T.detach(), model.pawns.linear.weight.T.detach()), dim=1).numpy()
        feature_transformer_biases = torch.cat((model.feature.linear.bias.detach(), model.pawns.linear.bias.detach())).numpy()
        print_weights(f, feature_transformer_weights)
        print_biases(f, feature_transformer_biases)
        print_weights(f, model.main.linear.weight.T.detach().numpy())
        print_biases(f, model.main.linear.bias.detach().numpy())
        print_weights(f, model.main.output.weight.T.detach().numpy())
        print_biases(f, model.main.output.bias.detach().numpy())

In [None]:
model = QNNE()

In [None]:
def train_model_with_lr(lr, iterations):
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    train_model(model, opt, iterations)

train_model_with_lr(0.01, 10000)
train_model_with_lr(0.005, 4000)
train_model_with_lr(0.0025, 2500)
train_model_with_lr(0.001, 1500)
train_model_with_lr(0.0005, 1000)
train_model_with_lr(0.00025, 500)
train_model_with_lr(0.0001, 500)

In [None]:
print_model(model, 'model.qnne')