### About version

In [None]:
version_name = "model_3_3"

### Import libraries

In [None]:
# Import libraries
import torch
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch dataset
from torch.utils.data.sampler import SubsetRandomSampler

# PyTorch model
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau


### Load Dataset

In [None]:
df = open('./Training Dataset/play_style_train.csv').read().splitlines()
# df = df[:25000]

games = [i.split(',', 2)[-1] for i in df]
game_styles = [int(i.split(',', 2)[-2]) for i in df]

In [None]:
print(games[0])
print(game_styles[:10])

### Data processing

In [None]:
# Set up coordinate
chars = 'abcdefghijklmnopqrs'
coordinates = {k: v for v, k in enumerate(chars)}

In [None]:
def convert_game(moves):
    x = np.zeros((4, 19, 19))
    last_move_color = moves[-1][0]

    for move in moves:
        color = move[0]
        column = coordinates[move[2]]
        row = coordinates[move[3]]
        column = coordinates[move[2]]
        row = coordinates[move[3]]
        x[0, row, column] = 1

        if (color == last_move_color):
            x[2, row, column] = 1
        else:
            x[1, row, column] = 1

    last_move_column = coordinates[moves[-1][2]]
    last_move_row = coordinates[moves[-1][3]]
    x[3, row, column] = 1

    return x

In [None]:
# Convert y into an one-hot array
def one_hot_encoding(value):
    one_hot = torch.eye(3)[value]
    one_hot = np.array(one_hot)
    return one_hot

In [None]:
x = []
for game in games:
    moves_list = game.split(',')
    x.append(convert_game(moves_list))
x = np.array(x)

Y = []
y = np.array(game_styles) - 1
for yi in y:
    Y.append(one_hot_encoding(yi))

### Data Transform

In [None]:
# flip and rotate the image to increase the training data
def flip_rotate(feature: np.ndarray, target: np.ndarray):
    a0 = np.array(feature)
    a1 = np.rot90(a0, axes=(2, 3))
    a2 = np.rot90(a1, axes=(2, 3))
    a3 = np.rot90(a2, axes=(2, 3))
    b0 = np.flip(a0, axis=3)
    b1 = np.rot90(b0, axes=(2, 3))
    b2 = np.rot90(b1, axes=(2, 3))
    b3 = np.rot90(b2, axes=(2, 3))

    c = np.concatenate((a0, a1, a2, a3, b0, b1, b2, b3), axis=0)

    d = np.concatenate((target, target, target, target,
                       target, target, target, target), axis=0)
    return c, d


In [None]:
x,y = flip_rotate(x, Y)

### Load Data

In [None]:
# number of subprocesses to use for data loading
num_workers = 0
# how many samples per batch to load
batch_size = 64
# percentage of training set to use as validation
valid_size = 0.1

In [None]:
# obtain training indices that will be used for validation
num_batch = len(x) // batch_size
num_train = num_batch * batch_size
indices = list(range(num_train))
np.random.shuffle(indices)
split = int(np.floor(valid_size * num_batch)) * batch_size
train_idx, valid_idx = indices[split:], indices[:split]
print(len(x), batch_size, num_batch, num_train, split)

In [None]:
# define samplers for obtaining training and validation batches
train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)
print(len(train_sampler), len(valid_sampler))

In [None]:
# prepare data loaders (combine dataset and sampler)
x = torch.Tensor(x)
y = torch.Tensor(y)

dataset = torch.utils.data.TensorDataset(x, y)

train_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=batch_size,
    sampler=train_sampler,
    num_workers=num_workers
)
valid_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=batch_size,
    sampler=valid_sampler,
    num_workers=num_workers
)

### Define Model

In [None]:
class InputBlock(nn.Module):
    def __init__(self, in_planes, out_planes, stride=1):
        super(InputBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=1, padding=0, bias=False)
        self.conv2 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.conv3 = nn.Conv2d(in_planes, out_planes, kernel_size=5, stride=1, padding=2, bias=False)

    def forward(self, x):
        out = self.conv1(x) + self.conv2(x) + self.conv3(x)
        out = F.relu(out)
        return out

class BasicBlock(nn.Module):
    def __init__(self, in_planes, out_planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.shortcut = nn.Sequential()

    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = self.conv2(out)
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, in_planes=4, num_blocks=10):
        super(ResNet, self).__init__()
        self.layer1 = InputBlock(in_planes, out_planes=12, stride=1)
        self.layer2 = self.make_layer(BasicBlock, 12, num_blocks)
        self.layer3 = nn.Conv2d(12, 3, kernel_size=3, stride=1, padding=1, bias=False)
        self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(1083, 256)
        self.fc2 = nn.Linear(256, 64)
        self.fc3 = nn.Linear(64, 3)

        self.dropout = nn.Dropout(0.25)


    def make_layer(self, block, planes, num_blocks):
        layers = []
        for n in range(num_blocks):
            layers.append(block(planes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.flatten(out)

        out = F.relu(self.fc1(out))
        out = self.dropout(out)
        out = F.relu(self.fc2(out))
        out = self.dropout(out)
        out = F.softmax(self.fc3(out), dim=1)
        return out

In [None]:
model = ResNet(in_planes=4, num_blocks=12)
model


### Make directory


In [None]:
# Make a directory to retore the paarameters of model of each epoch
import os

directory_path = f"./{version_name}"

# Check if the directory already exists
if not os.path.exists(directory_path):
    # If it doesn't exist, create the directory
    os.makedirs(directory_path)
    print(f"Directory '{directory_path}' created successfully.")
else:
    print(f"Directory '{directory_path}' already exists.")

### Training The Model

In [None]:
# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
    mps_device = torch.device("mps")
else:
    print('CUDA is available!  Training on GPU ...')

# move tensors to GPU if CUDA is available
if train_on_gpu:
    model.cuda()
else:
    model.to(mps_device)

In [None]:
class_weights = torch.tensor([1.0, 1.0, 1.5])
class_weights = class_weights.cuda()

criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5000, factor=0.97, min_lr=5e-7)

In [None]:
epoch_num = 50

valid_loss_min = np.Inf

all_training_loss = []
all_validation_loss = []

lr = []

for epoch in range(1, epoch_num+1):

    train_loss = 0.0
    valid_loss = 0.0

    class_correct = list(0. for i in range(3))
    class_total = list(0. for i in range(3))

    ##### train the model #####
    model.train()
    for _, all_data in enumerate(train_loader):

        if train_on_gpu:
            data = all_data[0].cuda()
            target = all_data[1].cuda()
        else:
            data = all_data[0].to(mps_device)
            target = all_data[1].to(mps_device)

        lr.append(optimizer.param_groups[0]['lr'])

        # clear the gradients of all optimized variables
        optimizer.zero_grad()

        output = model(data)

        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        scheduler.step(loss)


        train_loss += loss.item() * data.size(0)


    ##### validate the model #####
    model.eval()
    for _, all_data in enumerate(valid_loader):
        if train_on_gpu:
            data = all_data[0].cuda()
            target = all_data[1].cuda()
        else:
            # data, target = data.to(mps_device), target.to(mps_device)
            data = all_data[0].to(mps_device)
            target = all_data[1].to(mps_device)

        output = model(data)
        loss = criterion(output, target)
        valid_loss += loss.item() * data.size(0)

        # convert output probabilities to predicted class
        _, pred = torch.max(output, 1)

        ans = []
        for i in range(batch_size):
            for j in range(3):
                if target[i][j]:
                    ans.append(j)
        ans = torch.tensor(np.array(ans))

        if train_on_gpu:
            pred = pred.cpu()
        else:
            pred = pred.cpu()

        correct_tensor = pred.eq(ans.data.view_as(pred))
        correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())

        # calculate test accuracy for each object class
        for i in range(batch_size):
            label = ans.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

    # calculate average losses
    train_loss = train_loss/len(train_loader.sampler)
    valid_loss = valid_loss/len(valid_loader.sampler)


    all_training_loss.append(train_loss)
    all_validation_loss.append(valid_loss)

    # print training/validation statistics
    print('Epoch: {} \n\tTraining Loss: {:.6f} \n\tValidation Loss: {:.6f} [Minimum Validation Loss in the History: {:.6f}]'.format(
        epoch, train_loss, valid_loss, valid_loss_min))

    for i in range(3):
        if class_total[i] > 0:
            print('\t\tValidation Accuracy of style %d: %2d%% (%2d/%2d)' % (
                i + 1, 100 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('\t\tTest Accuracy of style: N/A (no training examples)' % (i + 1))

    print('\t\tTotal Accuracy: %2.2f%% (%2d/%2d)' % (100 * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))

    torch.save(model.state_dict(), f"{directory_path}/{version_name}_{valid_loss:.6f}.pt")

    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('\t***** Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ... *****'.format(valid_loss_min, valid_loss))
        torch.save(model.state_dict(), f"{version_name}.pt")
        valid_loss_min = valid_loss



### Plot the result


In [None]:
# showing the training loss and validation loss during the training process
plt.plot(np.array(all_training_loss),label="training loss")
plt.plot(np.array(all_validation_loss), label="validation loss")
plt.title("Training Curve")
plt.legend()
plt.show()

### Test the Trained Network

In [None]:
def Test(data_path: str, model, dict_path: str, output_path: str):

    # read file
    df_test = open(data_path).read().splitlines()

    # convert into formatted np array
    games = [i.split(',', 1)[-1] for i in df_test]
    game_ids = [i.split(',', 1)[0] for i in df_test]

    x_test = []
    for game in games:
        moves_list = game.split(',')
        filtered_list = [element for element in moves_list if element]
        x_test.append(convert_game(filtered_list))

    x_test = np.array(x_test)

    # convert the np array into torch tensor
    x_test = torch.Tensor(x_test)

    # load the dictionary
    model.load_state_dict(torch.load(
        dict_path, map_location=torch.device('cpu')))
    model.eval()

    # evaluate the result
    with torch.no_grad():
        output = model(x_test)

    # convert the result into value
    ans = []
    for o in output:
        if (o[0] > 0.5):
            ans.append(1)
        elif (o[1] > 0.5):
            ans.append(2)
        else:
            ans.append(3)

    # append the answer to the csv file
    with open(output_path, 'a+') as f:
        for i in range(len(ans)):
            f.write(f"{game_ids[i]},{ans[i]}\n")


In [None]:
# Set up coordinate
chars = 'abcdefghijklmnopqrs'
coordinates = {k: v for v, k in enumerate(chars)}

def convert_game(moves):
    x = np.zeros((4, 19, 19))
    last_move_color = moves[-1][0]

    for move in moves:
        color = move[0]
        column = coordinates[move[2]]
        row = coordinates[move[3]]
        column = coordinates[move[2]]
        row = coordinates[move[3]]
        x[0, row, column] = 1

        if (color == last_move_color):
            x[2, row, column] = 1
        else:
            x[1, row, column] = 1

    last_move_column = coordinates[moves[-1][2]]
    last_move_row = coordinates[moves[-1][3]]
    x[3, row, column] = 1

    return x

In [None]:
load_dict_name = './model_3_3.pt'
TEST_PATH = "./Testing Dataset/play_style_test_public.csv"
output_path = "./public_prediction_model_3_3.csv"


In [None]:
model = model.to('cpu')
Test(TEST_PATH,model, load_dict_name,output_path)