In [None]:
import math
from typing import Tuple
from fractions import Fraction

import torch
import torchtext
from torchtext.data import get_tokenizer
from torchtext.legacy.data import Field, TabularDataset, BucketIterator
from torchtext import data

from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset
import torch.optim as optim

import numpy as np
import pickle
import matplotlib as plt

from tqdm import tqdm

# The first time you run this will download a ~823MB file
glove = torchtext.vocab.GloVe(name="6B", # trained on Wikipedia 2014 corpus
                              dim=300)   # embedding size = 100

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
data_path = "/content/gdrive/My Drive/UTM/CSC413/Final Project/"
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
def parse_data(t):
    tokenizer = get_tokenizer("basic_english")
    tokens_from_tokenizer = tokenizer(t)
    tokens = []
    for word in tokens_from_tokenizer:
        if word in glove.stoi:
          tokens.append(word)
        else: 
          tokens.extend(word)
    return tokens

#parse_data(glove, "What is prob of picking 1 b and 1 p when two letters picked without replacement from tpppbbpbbb?")
#parse_data(glove, "What is prob of picking 1 p and 1 y when two letters picked without replacement from {y: 1, p: 2, z: 1, n: 2}?")

In [None]:
def preprocess(data_path, batch_size=32, device="cpu", embedding="glove.6B.300d", shuffle=True):
    questions = Field(sequential=True, use_vocab=True, tokenize=parse_data, lower=True)
    solutions = Field(sequential=False, use_vocab=False, dtype=torch.float)
    # Create fields
    fields = {'question': ('q', questions), 'solution': ('s', solutions)}

    # Obtain datasets
    train_data, valid_data, test_data = TabularDataset.splits(path=data_path, train="train(smaller).json", validation="valid(smaller).json",
                                               test='test(smaller).json', format='json', fields=fields)

    # Build vocab object
    questions.build_vocab(train_data, max_size=10000, min_freq=1, vectors=embedding)

    # Obtain iterators for batch training (dataset: tuple, batch_size: tuple) (Must be 1 to 1)
    train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
        (train_data, valid_data, test_data), batch_sizes=(batch_size, batch_size, batch_size), device=device, shuffle=shuffle, sort=False
    )
    return train_data, valid_data, test_data, train_iterator, valid_iterator, test_iterator, questions


# Obtain all data and iterators
train_data, valid_data, test_data, train_iterator, valid_iterator, test_iterator, questions_field = preprocess(data_path, batch_size=256, device=device, shuffle=True)

In [None]:
def get_accuracy(model, data):
    model.eval()
    correct = 0
    total = 0
    for batch in data:
        src_mask = generate_square_subsequent_mask(len(batch.q)).to(device)
        # Move output to cpu for error calculation
        output = model(batch.q, src_mask).cpu()
        # Move labels to cpu for error calculation
        labels = batch.s.cpu()
        # tolerance = 1e-3     i.e. 0.001 difference between label and output
        correct += sum(np.isclose(output.detach().numpy(), labels, 1e-3,1e-3)) # n in 10**-n is the number of decimal places
        total += len(batch.s)
    return correct / total

In [None]:
def train(model, train_data, valid_data, weight_decay=0.0,
           learning_rate=0.005, num_epochs=50):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(),
                           lr=learning_rate,
                           weight_decay=weight_decay)

    iters_sub, train_accs, val_accs  = [], [] ,[]
    checkpoint_path = '/content/gdrive/My Drive/CSC413/Final Project/ckpt'
    
    #src_mask = generate_square_subsequent_mask(44).to(device)
    batches = 0
    # Added tqdm to track progression
    for n in tqdm(range(1, num_epochs+1)):
        # Obtain loss per epoch
        temp_loss = None
        for batch in train_iterator:
            questions = batch.q.to(device)
            answers = batch.s.to(device)
            src_mask = generate_square_subsequent_mask(len(questions)).to(device)
            model.train()
            zs = model(questions, src_mask)
            loss = criterion(zs, answers) # compute the total loss
            temp_loss = loss
            loss.backward()          # compute updates for each parameter
            optimizer.step()         # make the updates for each parameter
            optimizer.zero_grad()    # a clean up step for PyTorch

        # save the current training information
        temp_loss = temp_loss.cpu().detach().item()
        losses.append(temp_loss)
        iters_sub.append(n)
        train_acc = get_accuracy(model, train_iterator)
        train_accs.append(train_acc)
        val_acc = get_accuracy(model, valid_iterator)
        val_accs.append(val_acc)
        print("\nEpoch: {}\nTrain Acc: {} %       Valid Acc: {} % \nTrain Loss: {}".format(n, round(train_acc, 5)*100, round(val_acc, 5)*100, temp_loss))
        # print("Epoch %d. [Val Acc %.3f%%] [Train Acc %.3f%%]" % (
              # n, val_acc * 100, train_acc * 100))

        if (checkpoint_path is not None) and n % 200 == 0:
            torch.save(model.state_dict(), checkpoint_path + str(n) + ".dat")
    
    plt.title("Learning Curve: Accuracy per Epoch")
    plt.plot(iters_sub, train_accs, label="Train")
    # plt.plot(iters_sub, val_accs, label="Validation")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend(loc='best')
    plt.show()



In [None]:
class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, 0)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, 4096)
        self.layer1 = nn.Linear(4096, 4096)
        self.layer2 = nn.Linear(4096, 2048)
        self.layer3 = nn.Linear(2048, 2048)
        self.layer4 = nn.Linear(2048, 1024)
        self.layer5 = nn.Linear(1024, 1024)
        self.layer6 = nn.Linear(1024, 512)
        self.layer7 = nn.Linear(512, 512)
        self.layer8 = nn.Linear(512, 1)

        self.dropout = nn.Dropout(dropout)


        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)
        self.layer1.weight.data.uniform_(-initrange, initrange)
        self.layer1.bias.data.zero_()
        self.layer2.weight.data.uniform_(-initrange, initrange)
        self.layer2.bias.data.zero_()
        self.layer3.weight.data.uniform_(-initrange, initrange)
        self.layer3.bias.data.zero_()
        self.layer4.weight.data.uniform_(-initrange, initrange)
        self.layer4.bias.data.zero_()
        self.layer5.weight.data.uniform_(-initrange, initrange)
        self.layer5.bias.data.zero_()
        self.layer6.weight.data.uniform_(-initrange, initrange)
        self.layer6.bias.data.zero_()
        self.layer7.weight.data.uniform_(-initrange, initrange)
        self.layer7.bias.data.zero_()
        self.layer8.weight.data.uniform_(-initrange, initrange)
        self.layer8.bias.data.zero_()

    def forward(self, src: Tensor, src_mask: Tensor) -> Tensor:
        """
        Args:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        output = self.transformer_encoder(src, src_mask)
        output = torch.mean(output, 0)

        output = self.decoder(output)
        output = torch.relu(output)

        output = self.layer1(output)
        output = torch.relu(output)

        output = self.dropout(output)

        output = self.layer2(output)
        output = torch.relu(output)

        output = self.layer3(output)
        output = torch.relu(output)

        output = self.layer4(output)
        output = torch.relu(output)

        output = self.layer5(output)
        output = torch.relu(output)

        output = self.layer6(output)
        output = torch.relu(output)

        output = self.layer7(output)
        output = torch.relu(output)

        output = self.layer8(output)

        return torch.squeeze(output)


def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [None]:
def get_test_accuracy(model, data, tolerance):
    model.eval()
    correct = 0
    total = 0
    for batch in data:
        src_mask = generate_square_subsequent_mask(len(batch.q)).to(device)
        output = model(batch.q, src_mask).cpu().detach().numpy()
        labels = batch.s.cpu()
        correct += sum(np.isclose(output, labels, tolerance, tolerance)) # n in 10**-n is the number of decimal places
        total += len(batch.s)
    return correct / total

emsize = 300  # embedding dimension
ntoken = len(questions_field.vocab)
d_hid = 1024  # dimension of the feedforward network model in nn.TransformerEncoder
nlayers = 6  # number of nn.TransformerEncoderLayer in nn.TransformerEncoder
nhead = 3  # number of heads in nn.MultiheadAttention
dropout = 0.1  # dropout probability
model = TransformerModel(ntoken, emsize, nhead, d_hid, nlayers, dropout)
# Obtain previous params
model.load_state_dict(torch.load("/content/gdrive/My Drive/UTM/CSC413/Final Project/ckpt2/1950.dat"))
# Dump to GPU after init
model = model.to(device)


acc = get_test_accuracy(model,test_iterator, 1e-3)
print(f'Test accuracy: {round(acc*100, 3)}')