In [1]:
# import all libraries

import numpy as np
import sklearn.model_selection
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import torch.optim as optim
import torchvision.transforms as transforms
from collections import Counter
from pathlib import Path
from io import BytesIO


In [None]:
# import all libraries

import numpy as np
import sklearn.model_selection
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import torch.optim as optim
import torchvision.transforms as transforms
from collections import Counter
from pathlib import Path
from io import BytesIO

# from google.colab import files
# uploaded = files.upload()


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Here we train an RNN (with LSTM) predicting which player from the pool
# has participated in the game based on only the played moves.



# Setup

# import games in csv
Csv = "filtered_games.csv"

# Player names
Players = [
    "Carlsen, Magnus", "Firouzja, Alireza", "Caruana, Fabiano",
    "Nepomniachtchi, Ian", "Cramling Bellon, Anna", "Giri, Anish",
    "Niemann, Hans Moke", "Cramling, Pia", "Nakamura, Hikaru",
    "Botez, Alexandra", "Botez, Andrea", "Belenkaya, Dina",
    "So, Wesley",
]

# We use only 4 players
Players = Players[:4]

# Initialize batch size, needs to be big here for faster
# simulations on the laptop, and smaller when using LUMI
Batchsize = 32

# Define the lngth of the games (small for laptop and
# big when using LUMI)
Game_Length = 100

# Step size / Learning rate for the Adam optimizer
Stepsize = 2e-3

# Iterations / Epochs
Iterations = 50

# LSTM hidden dimension
LSTM_Hidden = 128

# Embedding dimension
Dimension_Embedded = 128






# Data loadeing

data = pd.read_csv(Csv) # loading into dataframe


# we set names (coloumns) as strings
data["white_name"] = data["white_name"].astype(str)
data["black_name"] = data["black_name"].astype(str)



# Now we only want to do this for top 4 players (with most data)
# Since we are running this on a laptop and for the dataset to
# be more balanced
def player_match(name:str):
    # Return player name if in sub-string
    lowered = name.lower()
    for player in Players:
        if player.lower() == lowered:
            return player
    return None

# Now determining player of each game
w = data["white_name"].apply(player_match)
b = data["black_name"].apply(player_match)
data["PlayerLabel"] = w.fillna(b) # if not white go back to black

# Remove all other games
data = data.dropna(subset=["PlayerLabel"]).reset_index(drop=True)





# now we map locally
encodep = dict(zip(Players, range(len(Players))))
decodep = {m: l for l, m in encodep.items()}


# we make a temperory split for training tokenization
dat_temp = data
l = len(dat_temp)

traindata, _, _ = random_split(
    range(l),
    [int(l*0.8), int(l*0.1), l - int(l*0.8) - int(l*0.1)], # test data
    generator=torch.Generator().manual_seed(123) # seed
)





# Tokenization (from used steps/moves)

# Using cleaner to take unwanted things out
cleaner = str.maketrans({"[": "", "]": "", "'": "", ",": ""})
all_step = [ # flatten moves from games
    k
    for s in data.loc[traindata.indices, "list_of_moves"]
    for k in s.translate(cleaner).split()
]
# count frequency of moves
frequency = Counter(all_step)

# keeping it simple (just anything we dont know)
Dir = {"<PAD>": 0, "<UNK>": 1}
# Assign id
Dir.update({n: len(Dir) + i for i, n in enumerate(frequency)})





def step_encode(step): # convert raw mos into tokens
    # This pads using zeroes and trunctaes with game-lenth
    cleaned = step.translate(str.maketrans({"[": "", "]": "", "'": "", ",": ""}))
    tokening = cleaned.split()
    # use ids
    # use UNK(1) when it is unknown
    vector = list(map(lambda i_token: Dir.get(i_token, 1), tokening[: Game_Length]))
    pad = np.zeros(Game_Length - len(vector), dtype=int).tolist()
    vector = vector + pad
    return vector





# Now we handle the data in a class

class Gamesequence(Dataset): # wrapping dataset
    def __init__(object, wind): # store frame
        object.win = wind
        # convert names inot int labeling for class
        object.labels = wind["PlayerLabel"].map(encodep).to_list()
        object.moves  = wind["list_of_moves"].to_list() # save for encoding

    def __len__(object): # pytorch standard
        length = object.win.__len__() # nr of samples
        return length

    def __getitem__(object, m): # get training from index
        # id's of tokens
        x = torch.tensor(step_encode(object.moves[m]), dtype=torch.long)
        # player
        y = torch.tensor(object.labels[m], dtype=torch.long)
        return x, y






# Now we spilt the data into train, validation and
# test data as learnt in the lectures

# Use 80, 10, 10 split
dat = Gamesequence(data)
l = len(dat)
traindata, validationdata, testdata = random_split(
    dat,
    [int(l * 0.8), # train data
     int(l * 0.1), # validation data
     # to avoid rounding problems we subtract:
     l - int(l * 0.8) - int(l * 0.1)], # test data
    generator=torch.Generator().manual_seed(123)
    # we choose seed 123 this time, works better
)

# Suffle only train, not the others
training = DataLoader(traindata, batch_size = Batchsize, shuffle = True)
validating = DataLoader(validationdata, batch_size = Batchsize)
testing = DataLoader(testdata, batch_size = Batchsize)


# Cross Entropy loss (ideal and simple for classification tasks)
criterion = nn.CrossEntropyLoss()









# Now we build the RNN model with  2-layer bidirectional LSTM

class RecurrentNN(nn.Module):
    def __init__(object, Dir, Dimension_Embedded, LSTM_Hidden, Dimension_out):
        super(RecurrentNN, object).__init__() # initilaze pytorch nn.module

        # lookup table for the tokens
        object.table = nn.Embedding( # embedding for tokens
            num_embeddings=Dir,
            embedding_dim=Dimension_Embedded, # size of embeddings
            padding_idx=0 # telling torch 0's are padding, not actual moves
        )

        # Core
        object.core = nn.LSTM( # lstm core
            input_size=Dimension_Embedded, # use embedding vector
            # hidden dimension
            hidden_size=LSTM_Hidden,
            # 2 layers for a bit better results
            num_layers=2,
            batch_first=True,
            bidirectional=True, # backward and forward
            dropout=0.25 # minimizing overfitting (drop 25% units randomly)
            # For 2 layers it is ignored but for layers >= 3 (for later) it is good to have
        )

        # projection block outputting
        object.proj = nn.Sequential( # classifier part
            nn.Dropout(0.35), # same like before (being less dependent on single neurons)
            nn.Linear(2 * LSTM_Hidden, Dimension_out) # since bidirectional
        )

    def forward(object, step): # input flow
        step = object.table(step) # embedding tokens
        _,(state,_) = object.core(step) # running sequence

        # now we have the Last backward and forward states (hidden)
        fowardstate  = state[-2] # for 2 layers
        backwardstate = state[-1]

        # here we have a single vector concatenation
        vector = torch.cat([fowardstate, backwardstate], dim=1)

        return object.proj(vector)

model = RecurrentNN( # Building model
    Dir=len(Dir),
    Dimension_Embedded=Dimension_Embedded,
    LSTM_Hidden=LSTM_Hidden,
    Dimension_out=len(encodep)
).to(device)

# Use adam optimizer as i almost always do
optimization = torch.optim.Adam(
    params=list(model.parameters()),
    lr=Stepsize
)







# Now we train

vbest = 0.0
pat = 10
count_pat = 0
mbest = None

for iter in range(Iterations): # run iterations/epoch

    model.train() # training mode activation before updating gradients
    # Initialize variables
    loss_running = 0
    hit = 0
    seen = 0

    for xbatch, ybatch in tqdm(training, colour='green'): # iterating batches
        xbatch = xbatch.to(device)
        ybatch = ybatch.to(device)

        # reset from last batch
        optimization.zero_grad()

        # forward pass computation
        modeloutput = model(xbatch)

        # cross entropy loss
        lossval = criterion(modeloutput, ybatch)

        # back propagate
        lossval.backward()

        # Update
        optimization.step()

        # running loss
        loss_running += lossval.detach().cpu().item()

        # Accuracy (no gradients required)
        # Reduces computational cost minimally,
        # depending on GPU load
        with torch.inference_mode():
            # prediction (highest logit)
            g = modeloutput.argmax(1)

            # hits (correct)
            hit += (g == ybatch).sum().item()

            # samples processed
            seen += ybatch.size(0)

    # computing accuracy (avoiding division by 0)
    accuracy_training = hit / (1 if seen == 0 else seen)
    print(
        f"iteration nr {iter + 1} out of {Iterations}"
        f" running loss = {loss_running: .4f}"
        f"\ntraining accuracy = {accuracy_training: .4f}"
    )

    # And validate simultaniously

    # same again
    model.eval()
    hit = 0
    seen = 0

    with torch.inference_mode(): # without gradient update for evaluation
        for x, y in validating:
            x = x.to(device)
            y = y.to(device)

            modeloutput = model(x)
            g = modeloutput.argmax(1)
            hit += (g == y).sum().item()
            seen += y.size(0)

    accuracy_validation = hit / (1 if seen == 0 else seen)

    print(f"validation accuracy = {accuracy_validation: .4f}")

    # early stopping
    if accuracy_validation > vbest:
        vbest = accuracy_validation
        count_pat = 0
        # save mbest
        mbest = {
            k: v.clone().detach().to(device)
            for k, v in model.state_dict().items()
        }
        
    else:
        count_pat += 1

    if count_pat >= pat:
        break


# use mbest:
if mbest is not None:
    model.load_state_dict(mbest)




# Now we test on the test data at the end

# same again
model.eval()
hit = 0
seen = 0

with torch.inference_mode():
    for x, y in testing:
        x = x.to(device)
        y = y.to(device)

        modeloutput = model(x)
        g = modeloutput.argmax(1)
        hit += (g == y).sum().item()
        seen += y.size(0)

accuracy_test = hit / (1 if seen == 0 else seen)
print(f"test accuracy = {accuracy_test: .4f}")



100%|[32m██████████[0m| 39/39 [00:35<00:00,  1.09it/s]


iteration nr 1 out of 30  running loss =  52.4106   training accuracy =  0.3338   validation accuracy =  0.3648


100%|[32m██████████[0m| 39/39 [00:36<00:00,  1.06it/s]


iteration nr 2 out of 30  running loss =  50.3488   training accuracy =  0.3950   validation accuracy =  0.3817


100%|[32m██████████[0m| 39/39 [00:44<00:00,  1.13s/it]


iteration nr 3 out of 30  running loss =  48.0946   training accuracy =  0.4403   validation accuracy =  0.4018


100%|[32m██████████[0m| 39/39 [00:54<00:00,  1.39s/it]


iteration nr 4 out of 30  running loss =  45.6571   training accuracy =  0.4880   validation accuracy =  0.3965


100%|[32m██████████[0m| 39/39 [00:48<00:00,  1.24s/it]


iteration nr 5 out of 30  running loss =  41.7157   training accuracy =  0.5516   validation accuracy =  0.4088


100%|[32m██████████[0m| 39/39 [00:45<00:00,  1.17s/it]


iteration nr 6 out of 30  running loss =  36.1503   training accuracy =  0.6257   validation accuracy =  0.4244


 13%|[32m█▎        [0m| 5/39 [00:06<00:43,  1.29s/it]


KeyboardInterrupt: 