#Introduction

<a href="https://colab.research.google.com/drive/1wHPGH00jiXZUCCsaCI6dreoZdNqTyijK?usp=share_link"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="google colab logo" height=35px></a>

Check out the [pdf report](https://github.com/andrear632/ProjectDeepLearning/blob/main/Report_Project_Deep_Learning.pdf), our [GitHub repository](https://github.com/andrear632/ProjectDeepLearning) and [WandB interactive report](https://api.wandb.ai/links/project_dl/dhyw7j7v).

This is the colab for the Deep Learning project in which we try several models to answer mathematical questions.

The first thing we did was read the dataset paper [1] in which the dataset is described. This allowed us to better understand the end goal of the authors and helped us formulate some ideas on how to approach the task.

We decided to implement some of the models discussed in the paper; in this way, we would have a baseline to compare with our implementation. In particular, we thought that it would be interesting to compare the results obtained by an LSTM model and a Transformer model because they both can work with sequence-to-sequence tasks but are based on different strategies.

Before starting the analysis of the selected models, we should mention some limitations that influenced our work. Google Colab’s free tier does not provide much computational power; this highly limited how many samples we were able to use and for how long we were able to train/test the
models. Indeed, after some time of using the free GPU, the account that uses it would be temporarily suspended for exceeding the GPU availability time (circa four hours).

This means that we had to choose a relevant subset of the dataset to not exceed the available resources and we were unable to train as long as needed to reach the accuracies of the papers of the dataset [1] and the SOTA
[2].

# Imports
In this section we import all the needed libraries to run our models and we set some useful variables.

In [None]:
pip install wget pytorch_lightning datasets wandb --quiet 

In [None]:
pip install --upgrade gdown --quiet

In [None]:
# Importing dependencies
import json
import os
import wget
import random
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split, DataLoader

from pytorch_lightning import Trainer, LightningDataModule, LightningModule
from pytorch_lightning.loggers import WandbLogger

from datasets import load_dataset

import wandb

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# Dataset and Preprocessing
The following class is used to handle all the operations to retrieve and prepare the dataset.

We use this class to generate four dataloaders: one dataloader for the training phase, another one for the validation phase and two dataloaders for the testing phase (interpolate and extrapolate).

To download the data and create the dataset we use the load_dataset function from Hugging Face together with custom loading scripts in which we define the data source and the splits (they can be found in [our GitHub repository](https://github.com/andrear632/ProjectDeepLearning)).

In this class we also define a method that extracts relevant information from the dataset: the dictionary containing all possible characters and their corresponding indexes, the maximum length of questions and the maximum length of answers.

Using these metadata we encode the dataset so that it can be used by the models. We also provide a method to encode new questions not present in the dataset.


In [None]:
class MathDataModule(LightningDataModule):
  def __init__(self, seed: int = 16, batch_size: int = 1024, workers: int = 2, chosen_dataset: int = 0):  # 12 per colab pro, 2 per colab free
    super().__init__()

    self.batch_size = batch_size
    # '&' is the padding character,
    # '#' is the start of sentence char,
    # '@' is the end of sentence char
    self.char_to_idx = {"&": 0, "#": 1, "@": 2}
    self.question_max_length = 0
    self.answer_max_length = 0
    self.manual_seed = seed
    self.workers = workers

    if chosen_dataset == 0:
      # only round, 10000 samples per module
      self.url = "https://raw.githubusercontent.com/andrear632/ProjectDeepLearning/main/loading_script_round_10000.py"
    elif chosen_dataset == 1:
      # only round, 2000000 samples per module
      self.url = "https://raw.githubusercontent.com/andrear632/ProjectDeepLearning/main/loading_script_round_2000000.py"
    elif chosen_dataset == 2:
      # only round + composed, 2000000 samples per module
      self.url = "https://raw.githubusercontent.com/andrear632/ProjectDeepLearning/main/loading_script_round+composed_2000000.py"
    elif chosen_dataset == 3:
      # only differentiate + round, 2000000 samples per module
      self.url = "https://raw.githubusercontent.com/andrear632/ProjectDeepLearning/main/loading_script_differentiate+round_2000000.py"
    elif chosen_dataset == 4:
      # only differentiate + round + evaluate, 2000000 samples per module
      self.url = "https://raw.githubusercontent.com/andrear632/ProjectDeepLearning/main/loading_script_differentiate+round+evaluate_2000000.py"

  # downloading the dataset and extracting relevant informations
  def prepare_data(self): 
    wget.download(self.url, "/content/loading_script.py")
    load_dataset('/content/loading_script.py', download_mode="force_redownload")
    self._extract_metadata("/content/jsondataset/")
  
  def setup(self, stage: str):

    if stage == "fit":  # setup dataset for train and validations phases
      
      # Loading train split and encoding the samples into indexes
      dataset = load_dataset('/content/loading_script.py', split="train").map(self._encode)
      dataset.set_format(type="torch")

      # Splitting into train and validation sets
      train_len = dataset.num_rows
      split = [int(train_len*0.9), train_len - int(train_len*0.9)]
      self.dataset_train, self.dataset_val = random_split(dataset, split, generator=torch.Generator().manual_seed(self.manual_seed))

    if stage == "test":  # setup dataset for test phase

      # Loading test splits and encoding the samples into indexes
      interpolate = load_dataset('/content/loading_script.py', split="interpolate").map(self._encode)
      extrapolate = load_dataset('/content/loading_script.py', split="extrapolate").map(self._encode)

      interpolate.set_format(type="torch")
      extrapolate.set_format(type="torch")

      self.dataset_interpolate = interpolate
      self.dataset_extrapolate = extrapolate
  
  def train_dataloader(self):
    return DataLoader(self.dataset_train, batch_size=self.batch_size, shuffle=True, drop_last=True, num_workers=self.workers)
  
  def val_dataloader(self):
    return DataLoader(self.dataset_val, batch_size=self.batch_size, drop_last=True, num_workers=self.workers)

  def test_dataloader(self):
    dl1 = DataLoader(self.dataset_interpolate, batch_size=self.batch_size, drop_last=True, num_workers=self.workers)
    dl2 = DataLoader(self.dataset_extrapolate, batch_size=self.batch_size, drop_last=True, num_workers=self.workers)
    return [dl1, dl2]

  # Function that extracts information from the dataset (question_max_length, answer_max_length) and builds the dictionary
  def _extract_metadata(self, dir_path): 

    for folder in ["interpolate", "extrapolate", "train"]:
      folder_path = dir_path + folder + "/"

      for file_name in os.listdir(folder_path):
        with open(folder_path + file_name, "r") as input_file:

          for line_no, line in enumerate(input_file):
            line = line.strip()

            for char in line:  # populating dictionary
              if char not in self.char_to_idx:
                self.char_to_idx[char] = len(self.char_to_idx)
            
            line = json.loads(line)

            if line_no % 2 != 0:  # analysing answer line
              if len(line["answer"]) > self.answer_max_length:
                self.answer_max_length = len(line["answer"])
            else:  # analysing question line
              if len(line["question"]) > self.question_max_length:
                self.question_max_length = len(line["question"])

          input_file.close()

    self.idx_to_char = {value: key for key, value in self.char_to_idx.items()}  # building the inverse dictionary

  # Function that encodes both questions and answers using the dictionary of the dataset
  def _encode(self, input):

    dictionary_for_variables = {
        "question": (self.question_max_length, []),
        "answer": (self.answer_max_length, [])
    }

    for key in dictionary_for_variables.keys():

      string_to_turn = input[str(key)]
      dictionary_for_variables[key][1].append(1)  # adding start of line char 

      for char in string_to_turn:

        if char not in self.char_to_idx.keys():
          raise Exception("character not found in dict")

        dictionary_for_variables[key][1].append(self.char_to_idx[char])  # turning character in index

      dictionary_for_variables[key][1].append(2)  # adding end of line char

      for i in range(dictionary_for_variables[key][0]-len(input[str(key)])):
        dictionary_for_variables[key][1].append(0)  # adding padding after 

    return {"question": dictionary_for_variables["question"][1], "answer": dictionary_for_variables["answer"][1]}
  
  # Function to encode questions not from the dataset
  def encode_question(self, input):
    question = []
    question.append(1)  # adding start of line char 

    for char in input:
      if char not in self.char_to_idx.keys():
        raise Exception("character " + char + " not found in dict")
      question.append(self.char_to_idx[char])  # turning character into index
    
    question.append(2)  # adding end of line char 

    for i in range(self.question_max_length-len(input)):
        question.append(0)  # adding padding after 

    question = torch.tensor(question, device=device).unsqueeze(0)

    return {"question": question}

  def get_dictionaries(self):  # returns dictionaries: char_to_idx and idx_to_char
      return (self.char_to_idx, self.idx_to_char)

  def get_dictionary_size(self):  # returns length of the dictionary
      return (len(self.char_to_idx))

  def get_max_lengths(self):  # returns the maximum lengths of questions and answers
      return (self.question_max_length + 2, self.answer_max_length + 2)

      

# Model definitions
Run the utilities section and the section corresponding to the model you want to use.

## Utilities
This section contains useful functions which will be called by the models classes.

The metric that we use to assess the performance of the models is the accuracy of getting the correct answer to the question. More in detail, we followed the same criterion proposed in the dataset paper [1] in Paragraph 2.4. This means that the predicted answer is correct if and only if it matches character-for-character the correct answer. This also means that the model must be able to correctly predict the end of string token, otherwise the predicted answer would be wrong.

In [None]:
# Function that computes the accuracy as defined in the "mathematics dataset" paper [1]
def paper_accuracy(predicted_answers, correct_answers):
  num_correct_answers = 0

  # constant tokens
  start_of_line = 1
  end_of_line = 2

  for i in range(len(predicted_answers)):

    # Preparing predicted answer
    single_predicted_answer = torch.argmax(predicted_answers[i], 1).tolist()  # vector of shape (answer_max_length) (concatenates the max value for each row)
      
    if (start_of_line in single_predicted_answer and end_of_line in single_predicted_answer):  # check if there are start and enf of line char
      single_predicted_answer = single_predicted_answer[1:single_predicted_answer.index(end_of_line)]  # removing start and end of line char and additional characters
    else:  # predicted answer is wrong
      continue 

    # Preparing correct answer
    single_correct_answer = correct_answers[i].tolist()
    single_correct_answer = single_correct_answer[1:single_correct_answer.index(end_of_line)]  # removing start of line, end of line and following characters
      
    # If the predicted answer and the correct one have the same exact characters, the predicted answer is correct
    if (single_predicted_answer == single_correct_answer):
      num_correct_answers += 1

  return num_correct_answers/len(predicted_answers)

  

In [None]:
def print_batch(dictionary, questions, res):
  
  questions = questions["question"]
  index_list = torch.argmax(res[0], 1) # reverse one_hot
  question_string = answer_string = ""

  for j in range(len(questions[0])):
    question_string = question_string + dictionary[questions[0][j].item()]

  for j in range(len(index_list)):
    answer_string = answer_string + dictionary[index_list[j].item()]

  question_string = question_string[1:question_string.index("@")]
  if "@" in answer_string:
    answer_string = answer_string[1:answer_string.index("@")]
  else:
    answer_string = "cannot generate answer"
        
  print(question_string, answer_string)

In [None]:
def print_correct(dictionary, batch, res):
  
  for i in range(len(res)):
    
    index_list = torch.argmax(res[i], 1) # reverse one_hot
    predicted_string = answer_string = question_string = ""

    for j in range(len(batch["question"][i])):
      question_string = question_string + dictionary[batch["question"][i][j].item()]

    for j in range(len(index_list)):
      predicted_string = predicted_string + dictionary[index_list[j].item()]
      
    for j in range(len(batch["answer"][i])):
      answer_string = answer_string + dictionary[batch["answer"][i][j].item()]

    print(question_string, predicted_string, answer_string)

    

## LSTM
The first model that we implement for this task is inspired by the ”Simple LSTM” presented in the dataset paper [1]. Its architecture is simple and straightforward because it is made up of a single LSTM cell. This cell sequentially processes the question and the answer, one character at a time.
Furthermore, as suggested in the dataset paper [1], we introduced 16 ”thinking steps” before outputting the answer so that the model can correctly process the question. To do so, we pass a zero input to the LSTM to update the hidden state and the cell state.

During training, we also introduced teacher forcing to improve the results. In this way, when computing a character of the answer, the model receives as input the previous correct character with a fixed probability instead of the predicted one.

In [None]:
class LSTM(LightningModule):
  def __init__(self, dict_size, sizes, dictionaries, teacher_forcing_ratio: float = 0.5):
    super().__init__()

    self.hidden_units = 2048
    self.question_max_length = sizes[0]
    self.answer_max_length = sizes[1]
    self.dict_size = dict_size
    self.teacher_forcing_ratio = teacher_forcing_ratio
    self.dictionary = dictionaries[1]

    # Initializing used layers
    self.lstm = nn.LSTM(self.dict_size, self.hidden_units)
    self.linear_layer = nn.Linear(self.hidden_units, dict_size, bias=False)  # used to return vectors of shape (dict_size)

  def forward(self, batch):

    # Preparing the batch for processing. 
    # We need to transpose the batch from shape (batch_size, question_max_length, dict_size) to (question_max_length, batch_size, dict_size)
    # to match the input size required by nn.LSTM

    batch_size = len(batch["question"])

    batch_questions = batch["question"]  # shape (batch_size, question_max_length)
    batch_questions = torch.transpose(batch_questions, 0, 1)  # shape (question_max_length, batch_size)
    batch_questions = F.one_hot(batch_questions, self.dict_size)  # shape (question_max_length, batch_size, dict_size)
    batch_questions = batch_questions.float()

    if self.training:
      batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)
      batch_answers = torch.transpose(batch_answers, 0, 1)  # shape (answer_max_length, batch_size)
      batch_answers = F.one_hot(batch_answers, self.dict_size)  # shape (answer_max_length, batch_size, dict_size)
      batch_answers = batch_answers.float()

    # Initializing hidden_state and cell_state used by the lstm cell. In our case D = 1 (because it is not bidirectional) and num_layer = 1
    hidden_state = torch.zeros(1, batch_size, self.hidden_units, requires_grad=True, dtype=torch.float).to(device)  # shape (D*num_layers, batch_size, H_out)
    cell_state = torch.zeros(1, batch_size, self.hidden_units, requires_grad=True, dtype=torch.float).to(device)  # shape (D*num_layers, batch_size, H_cell)

    # Initializing result tensor
    result = torch.empty(self.answer_max_length, batch_size, self.dict_size).to(device)  # shape (answer_max_length, batch_size, dict_size)

    self.lstm.flatten_parameters()  # used to improve performance on GPU

    # Generating hidden_state and cell_state representing the questions, at each iteration 'i' 
    # we feed to the lstm cell the i-th character of each question in the batch.
    # input shape (1, batch_size, dict_size)
    # output shape (1, batch_size, hidden_size)

    for i in range(self.question_max_length):
      output, (hidden_state, cell_state) = self.lstm(batch_questions[i].unsqueeze(0), (hidden_state, cell_state))
      
    # Thinking steps used to improve performance
    think_vector = torch.zeros_like(batch_questions[0].unsqueeze(0)).float()

    for i in range(16):
      _, (hidden_state, cell_state) = self.lstm(think_vector, (hidden_state, cell_state))

    # Generating answer using hidden_state and cell_state.
    # Depending if we are in training or in evaluation we prepare the input to give the lstm cell in a different way:
    # - training: with probability 'teacher_forcing_ratio' we feed as input the correct previous char otherwise we use the predicted one.
    # - evaluation: the first time we pass the encoded start of line char otherwise we pass the previous predicted char
    for i in range(self.answer_max_length):

      result_temp = self.linear_layer(output[0])  # shape (batch_size, dict_size)
      result[i] = result_temp

      if self.training:
        teacher_force = random.random() < self.teacher_forcing_ratio  # true or false with prob 'teacher_forcing_ratio'

        if teacher_force:  # use correct previous char
          input = batch_answers[i].unsqueeze(0)  # shape (1, batch_size, dict_size)
        else:  # use previous predicted char 
          input = F.one_hot(torch.argmax(result_temp, 1), self.dict_size).unsqueeze(0).float().to(device)  # shape (1, batch_size, dict_size)
      
      else:
        if i == 0:
          input = F.one_hot(torch.tensor([1]), self.dict_size).repeat(batch_size, 1).unsqueeze(0).float().to(device)  # shape (1, batch_size, dict_size)
        else:
          input = F.one_hot(torch.argmax(result_temp, 1), self.dict_size).unsqueeze(0).float().to(device)  # shape (1, batch_size, dict_size)
      
      output, (hidden_state, cell_state) = self.lstm(input, (hidden_state, cell_state))  # shape (1, batch_size, hidden_size)

    return torch.transpose((result), 0, 1)  # shape (batch_size, answer_max_length, dict_size)

  def training_step(self, batch, _):

    # Preparing inputs  
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)
    batch_answers = batch_answers.flatten(0, 1)  # shape (batch_size * answer_max_length)

    # Computing prediction and loss
    pred = self(batch).flatten(0, 1)  # shape (batch_size * answer_max_length, dict_size)
    loss = F.cross_entropy(pred, batch_answers, ignore_index=0)  # 'ignore_index' allows us to ignore the padding added by the data loader
    
    self.log("train_loss", loss)
    
    return loss

  def validation_step(self, batch, _):

    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy 
    pred = self.predict(batch)  # shape (batch_size, answer_max_length, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper

    self.log("val_tot_accuracy", accuracy)  # at the end of every epoch it logs the average of the accuracies of each batch
    return accuracy

  def test_step(self, batch, batch_idx, dataset_idx):
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy 
    pred = self.predict(batch.copy())  # shape (batch_size, answer_max_length, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper
    
    if (batch_idx == 0):
      print_correct(self.dictionary, batch, pred)

    self.log("test_tot_accuracy", accuracy)  # at the end of every epoch it is logged the average of the accuracies of each batch
    
    return accuracy
  
  def predict(self, questions):
    return self(questions)

  def print_predict(self, questions):
    pred = self.predict(questions.copy())
    print_batch(self.dictionary, questions, pred)
    return

  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters(), lr=6e-4, betas=(0.9, 0.995), eps=1e-9)



## Transformer
The second model we chose to implement is a standard Transformer that follows the architecture described in the paper ”Attention Is All You Need ” (Vaswani et al.)[3]. We used common values for the hyperparameters such as 8 heads for each multi-head attention component and 6 layers for both the encoder and the decoder. We also tested the model using some dropout layers as suggested in
the transformer paper [3] but we obtained worse results, so we decided to disable it.
The weights of the embedding layer and the final linear layer are shared to ensure better decoding, as in the transformer paper [3]. In addition, we use sinusoidal positional encodings to make use of the order of the
sequences. In addition to the usual mask needed in the masked multi-attention mechanism, we also implemented padding masks to ensure that the padding at the end of each question and answer is not considered when computing the attention scores.

In [None]:
class TotalEmbeddings(LightningModule):

  def __init__(self, dict_size, sizes, embedding_size, padding_idx, p=0.1):
    super().__init__()
    
    # Variables Assignment
    self.dict_size = dict_size
    self.max_length = max(sizes[0], sizes[1])
    self.embedding_size = embedding_size
    self.padding_idx = padding_idx
    self.dropout_probability = p

    self.scale = math.sqrt(self.embedding_size)

    # Layer Initialization
    self.embedding_layer = nn.Embedding(self.dict_size, self.embedding_size, padding_idx=self.padding_idx, device=device)
    self.dropout = nn.Dropout(p=self.dropout_probability)
    
    # Sinusoidal Positional Embeddings Computation
    self.positional_embedding = self._initialize_pos_embedding().to(device)  # shape (max_len, embedding_size)

  def forward(self, input):
    input = input.to(device)  # shape (batch_size, input_len)
    char_embedding = self.embedding_layer(input)*self.scale
    return self.dropout(char_embedding + self.positional_embedding[:input.shape[1], :])  # shape (batch_size, input_len, embedding_size)

  def _initialize_pos_embedding(self):
    positional_embeddings = torch.zeros(self.max_length, self.embedding_size)  # shape (max_len, embedding_size)
    for pos in range(self.max_length):
      for i in range(0, self.embedding_size, 2):
        positional_embeddings[pos, i] = math.sin(pos / (10000 ** (i/self.embedding_size)))
        positional_embeddings[pos, i + 1] = math.cos(pos / (10000 ** (i/self.embedding_size)))
    return positional_embeddings  # shape (max_len, embedding_size)



In [None]:
class PositionwiseFFN(LightningModule):

  def __init__(self, embedding_size, hidden_units):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.hidden_units = hidden_units

    # Layers Initialization
    self.linear_1 = nn.Linear(self.embedding_size, self.hidden_units, device=device)
    self.linear_2 = nn.Linear(self.hidden_units, self.embedding_size, device=device)
    self.relu = nn.ReLU()

  def forward(self, input):
    output = self.linear_1(input)  # shape (batch_size, input_len, hidden_units)
    output = self.relu(output)
    output = self.linear_2(output)  # shape (batch_size, input_len, embedding_size)
    return output



In [None]:
class MultiHeadAttention(LightningModule):
  
  def __init__(self, embedding_size, num_heads, masked=False):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.masked = masked
    self.dk = self.dv = self.embedding_size//self.num_heads

    # Layers Initialization
    self.attention_heads = nn.ModuleList([AttentionHead(self.dk, self.dv, self.embedding_size, self.masked) for _ in range(self.num_heads)])
    self.Wo = nn.Linear(self.num_heads*self.dk, self.embedding_size, bias=False, device=device)  # shape (num_heads*dv, embedding_size)
  
  def forward(self, batch_keys, batch_queries, batch_values, padding_mask):  # input shape (batch_size, len_k/len_q/len_v, embedding_size)
    
    attention_heads_results = []
    
    for head in self.attention_heads:
      attention_heads_results.append(head(batch_keys, batch_queries, batch_values, padding_mask))  # shape (batch_size, len_q, dv)
    concatenated_results = torch.cat(attention_heads_results, 2)  # shape (batch_size, len_q, num_heads*dv)
    
    return self.Wo(concatenated_results) # shape (batch_size, len_q, embedding_size)


# We use the same notation as Vaswani et al meaning that dk, dv are the dimensions in which we project the input to create the keys and values
class AttentionHead(LightningModule):
  
  def __init__(self, dk, dv, embedding_size, masked):
    super().__init__()
    
    # Variables Assignment
    self.dk = dk
    self.dv = dv
    self.masked = masked

    # Layers Initialization
    self.Wk = nn.Linear(embedding_size, dk, bias=False, device=device)  # shape (embedding_size, dk)
    self.Wq = nn.Linear(embedding_size, dk, bias=False, device=device)  # shape (embedding_size, dk)
    self.Wv = nn.Linear(embedding_size, dv, bias=False, device=device)  # shape (embedding_size, dv)
    
  
  def forward(self, batch_keys, batch_queries, batch_values, padding_mask):  # input shape (batch_size, len_k/len_q/len_v, embedding_size)
    
    keys = self.Wk(batch_keys)  # shape (batch_size, len_k, dk)
    queries = self.Wq(batch_queries)  # shape (batch_size, len_q, dk)
    values = self.Wv(batch_values)  # shape (batch_size, len_v, dv)

    unnormalized_attention_score = torch.matmul(queries, keys.transpose(1, 2)) / math.sqrt(self.dk)  #shape (batch_size, len_q, len_k)
    
    if self.masked:
      input_len_1 = batch_queries.shape[1]
      input_len_2 = batch_keys.shape[1]
      target_mask = torch.triu(torch.full((input_len_1, input_len_2), float("-inf"), device=device), diagonal=1)
      unnormalized_attention_score = unnormalized_attention_score + target_mask
    
    # padding_mask shape (batch_size, len_k)
    padding_mask = padding_mask.unsqueeze(1).repeat(1, batch_queries.shape[1], 1)  # shape (batch_size, len_q, len_k)
    unnormalized_attention_score = unnormalized_attention_score.masked_fill_(padding_mask, float("-inf"))

    attention_score = F.softmax(unnormalized_attention_score, 2)
    return torch.matmul(attention_score, values)  # shape (batch_size, len_q, dv)



In [None]:
class Decoder(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, num_layers, p=0.1):
    super().__init__()
    
    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.num_layers = num_layers
    self.dropout_probability = p

    # Layers Initialization
    self.layers = nn.ModuleList(
        [DecoderLayer(self.embedding_size, self.num_heads, self.hidden_units, p=self.dropout_probability) for _ in range(self.num_layers)])

  def forward(self, encoder_batch, outputs_batch, src_padding_mask, tgt_padding_mask):
    # encoder_batch shape (batch_size, encoder_len, embedding_size)
    # outputs_batch shape (batch_size, outputs_len, embedding_size)
    
    decoder_layer_ouput = outputs_batch

    for decoder_layer in self.layers:
      decoder_layer_ouput = decoder_layer(encoder_batch, decoder_layer_ouput, src_padding_mask, tgt_padding_mask)  # shape (batch_size, outputs_len, embedding_size)
    
    return decoder_layer_ouput  # shape (batch_size, outputs_len, embedding_size)

class DecoderLayer(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.dropout_probability = p

    # Layers Initialization
    self.ffn = PositionwiseFFN(self.embedding_size, self.hidden_units)

    self.mha = MultiHeadAttention(self.embedding_size, self.num_heads)
    self.masked_mha = MultiHeadAttention(self.embedding_size, self.num_heads, masked=True)
    
    self.layer_norm_1 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_2 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_3 = nn.LayerNorm(self.embedding_size, device=device)

    self.dropout = nn.Dropout(p=self.dropout_probability)
  
  
  def forward(self, encoder_batch, outputs_batch, src_padding_mask, tgt_padding_mask):
    # encoder_batch shape (batch_size, encoder_len, embedding_size)
    # outputs_batch shape (batch_size, outputs_len, embedding_size)

    masked_mha_output = self.masked_mha(outputs_batch, outputs_batch, outputs_batch, tgt_padding_mask) # shape (batch_size, outputs_len, embedding_size)
    masked_mha_output = self.dropout(masked_mha_output)
    masked_mha_output = self.layer_norm_1(masked_mha_output + outputs_batch)  # add & norm

    mha_output = self.mha(encoder_batch, masked_mha_output, encoder_batch, src_padding_mask)  # shape (batch_size, outputs_len, embedding_size)
    mha_output = self.dropout(mha_output)
    mha_output = self.layer_norm_2(mha_output + masked_mha_output)  # add & norm

    ffn_output = self.ffn(mha_output)  # shape (batch_size, outputs_len, embedding_size)
    ffn_output = self.dropout(ffn_output)
    ffn_output = self.layer_norm_3(ffn_output + mha_output)  # add & norm

    return ffn_output  # shape (batch_size, outputs_len, embedding_size)



In [None]:
class Encoder(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, num_layers, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.num_layers = num_layers
    self.dropout_probability = p

    # Layers Initialization
    self.layers = nn.ModuleList(
        [EncoderLayer(self.embedding_size, self.num_heads, self.hidden_units, p=self.dropout_probability) for _ in range(self.num_layers)])


  def forward(self, batch, src_padding_mask):  # input shape (batch_size, input_len, embedding_size)

    encoder_layer_output = batch

    for encoder_layer in self.layers:
      encoder_layer_output = encoder_layer(encoder_layer_output, src_padding_mask)  # shape (batch_size, input_len, embedding_size)
    
    return encoder_layer_output  # shape (batch_size, input_len, embedding_size)


class EncoderLayer(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.dropout_probability = p

    # Layers Initialization
    self.ffn = PositionwiseFFN(self.embedding_size, self.hidden_units)
    self.mha = MultiHeadAttention(self.embedding_size, self.num_heads)
    
    self.layer_norm_1 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_2 = nn.LayerNorm(self.embedding_size, device=device)

    self.dropout = nn.Dropout(p=self.dropout_probability)
  
  def forward(self, batch, src_padding_mask):  # input shape (batch_size, input_len, embedding_size)

    mha_output = self.mha(batch, batch, batch, src_padding_mask)  # shape (batch_size, input_len, embedding_size)
    mha_output = self.dropout(mha_output)
    mha_output = self.layer_norm_1(mha_output + batch)  # add & norm

    ffn_output = self.ffn(mha_output)  # shape (batch_size, input_len, embedding_size)
    ffn_output = self.dropout(ffn_output)
    ffn_output = self.layer_norm_2(ffn_output + mha_output)  # add & norm

    return ffn_output  # shape (batch_size, input_len, embedding_size)



In [None]:
class Transformer(LightningModule):
  
  def __init__(self, dict_size, sizes, dictionaries, 
               embedding_size = 512, hidden_units = 2048, 
               num_heads = 8, num_encoder_layers=6, 
               num_decoder_layers= 6, padding_idx=0, p=0.):
    
    super().__init__()

    # Variables Assignment
    self.dict_size = dict_size
    self.question_max_length = sizes[0]
    self.answer_max_length = sizes[1]
    self.max_len = max(sizes[0], sizes[1])
    self.padding_idx = padding_idx
    self.idx_to_char = dictionaries[1]  # dictionary index to char
    self.dropout_probability = p
    
    self.embedding_size = embedding_size
    self.hidden_units = hidden_units
    self.num_heads = num_heads
    self.num_encoder_layers = num_encoder_layers
    self.num_decoder_layers = num_decoder_layers
    
    # Layers Initialization
    self.total_embedding = TotalEmbeddings(self.dict_size, sizes, self.embedding_size, self.padding_idx, p=self.dropout_probability)

    self.encoder = Encoder(self.embedding_size, num_heads= self.num_heads, 
                           hidden_units=self.hidden_units, num_layers=self.num_encoder_layers, p=self.dropout_probability)
    
    self.decoder = Decoder(self.embedding_size, num_heads= self.num_heads, 
                           hidden_units=self.hidden_units, num_layers=self.num_decoder_layers, p=self.dropout_probability)
    

  def forward(self, batch):
    
    questions = (batch["question"]).to(device)  # shape (batch_size, question_max_length)
    answers = (batch["answer"]).to(device)  # shape (batch_size, answer_max_length-1)

    source_pad_mask = (questions == 0).to(device)  # shape (batch_size, question_max_length)
    target_pad_mask = (answers == 0).to(device)  # shape (batch_size, answer_max_length-1)

    embedded_questions = self.total_embedding(questions)  # shape (batch_size, question_max_length, embedding_size)
    embedded_answers = self.total_embedding(answers)  # shape (batch_size, answer_max_length-1, embedding_size)

    encoder_output = self.encoder(embedded_questions, src_padding_mask=source_pad_mask)  # shape (batch_size, question_max_length, embedding_size)
    decoder_output = self.decoder(encoder_output, embedded_answers, src_padding_mask=source_pad_mask, tgt_padding_mask=target_pad_mask)  # shape (batch_size, answer_max_length-1, embedding_size)

    # Softmax not used because of nn.CrossEntropyLoss
    return torch.matmul(decoder_output, torch.transpose(self.total_embedding.embedding_layer.weight, 0, 1))  # shape (batch_size, answer_max_length-1, dict_size)

  def training_step(self, batch, _):
    # Preparing inputs  
    batch_answers = batch["answer"][:, 1:].flatten(0, 1)  # shape (batch_size * answer_max_length-1)
    batch["answer"] = batch["answer"][:, :-1]  # shape (batch_size, answer_max_length-1)

    # Computing prediction and loss
    pred = self(batch).flatten(0, 1)  # shape (batch_size * answer_max_length-1, dict_size)
    loss = F.cross_entropy(pred, batch_answers, ignore_index=0)
    self.log("train_loss", loss)
    return loss

  def validation_step(self, batch, _):
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy 
    pred = self.predict(batch)  # shape (batch_size, answer_max_length-1, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper

    self.log("val_tot_accuracy", accuracy)  # at the end of every epoch it logs the average of the accuracies of each batch
    return accuracy

  def test_step(self, batch, batch_idx, dataset_idx):
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy
    pred = self.predict(batch.copy())  # shape (batch_size, answer_max_length-1, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper
    
    if (batch_idx == 0):
      print_correct(self.idx_to_char, batch, pred)

    self.log("test_tot_accuracy", accuracy)  # at the end of every epoch it is logged the average of the accuracies of each batch
    return accuracy

  def predict(self, batch):  # input shape (batch_size, question_max_length)
    batch["answer"] = torch.tensor([[1] for j in range(len(batch["question"]))], device=device).long()

    for i in range(self.answer_max_length-1):
      transformer_result = self(batch)
      batch["answer"] = torch.tensor([], device=device).long()

      for j in range(len(transformer_result)):
        predicted_chars = torch.argmax(transformer_result[j], 1)
        
        if len(predicted_chars.shape)==1:
          predicted_chars = predicted_chars.unsqueeze(0)

        start_line_char = torch.tensor([1], device=device).unsqueeze(0).long()
        predicted_chars = torch.cat((start_line_char, predicted_chars), 1)

        batch["answer"] = torch.cat((batch["answer"], predicted_chars), 0)

    return F.one_hot(batch["answer"], num_classes=self.dict_size)  # shape (batch_size, answer_max_length-1, dict_size)
  
  def print_predict(self, questions):
    pred = self.predict(questions.copy())
    print_batch(self.idx_to_char, questions, pred)
    return
  
  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters(), lr=1e-4, betas=(0.9, 0.995))



## TP-Transformer
The third and final model we implemented is the current state-of-the-art for the Mathematics Dataset: the Tensor-Product Transformer [2]. The architecture of this model is the same as the standard transformer except in the multi-head attention, which is replaced with a TPMHA (TP
multi-head attention). In this model, there is a new Role vector in addition to the Key, Query, and Value vectors. We call Filler the result of the standard attention head. Each new head then binds that filler to its
role via the tensor product and applies an affine transformation. Finally, the results of each head are summed to form the Tensor-Product Representation of the structure with multiple heads. To control dimensionality, the SOTA paper [2] suggests using pointwise multiplication, which is a contraction of the tensor product.

In [None]:
class TPTotalEmbeddings(LightningModule):

  def __init__(self, dict_size, sizes, embedding_size, padding_idx, p=0.1):
    super().__init__()
    
    # Variables Assignment
    self.dict_size = dict_size
    self.max_length = max(sizes[0], sizes[1])
    self.embedding_size = embedding_size
    self.padding_idx = padding_idx
    self.dropout_probability = p

    self.scale = math.sqrt(self.embedding_size)

    # Layer Initialization
    self.embedding_layer = nn.Embedding(self.dict_size, self.embedding_size, padding_idx=self.padding_idx, device=device)
    self.dropout = nn.Dropout(p=self.dropout_probability)
    
    # Sinusoidal Positional Embeddings Computation
    self.positional_embedding = self._initialize_pos_embedding().to(device)  # shape (max_len, embedding_size)

  def forward(self, input):
    input = input.to(device)  # shape (batch_size, input_len)
    char_embedding = self.embedding_layer(input)*self.scale
    return self.dropout(char_embedding + self.positional_embedding[:input.shape[1], :])  # shape (batch_size, input_len, embedding_size)


  def _initialize_pos_embedding(self):
    positional_embeddings = torch.zeros(self.max_length, self.embedding_size)  # shape (max_len, embedding_size)
    for pos in range(self.max_length):
      for i in range(0, self.embedding_size, 2):
        positional_embeddings[pos, i] = math.sin(pos / (10000 ** (i/self.embedding_size)))
        positional_embeddings[pos, i + 1] = math.cos(pos / (10000 ** (i/self.embedding_size)))
    return positional_embeddings  # shape (max_len, embedding_size)



In [None]:
class TPPositionwiseFFN(LightningModule):

  def __init__(self, embedding_size, hidden_units):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.hidden_units = hidden_units

    # Layers Initialization
    self.linear_1 = nn.Linear(self.embedding_size, self.hidden_units, device=device)
    self.linear_2 = nn.Linear(self.hidden_units, self.embedding_size, device=device)
    self.relu = nn.ReLU()

  def forward(self, input):
    output = self.linear_1(input)  # shape (batch_size, input_len, hidden_units)
    output = self.relu(output)
    output = self.linear_2(output)  # shape (batch_size, input_len, embedding_size)
    return output



In [None]:
class TPMultiHeadAttention(LightningModule):
  
  def __init__(self, embedding_size, num_heads, masked=False):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.masked = masked
    self.dk = self.embedding_size//self.num_heads

    # Layers Initialization
    self.attention_heads = nn.ModuleList([TPAttentionHead(self.dk, self.embedding_size, self.masked) for _ in range(self.num_heads)])
  
  def forward(self, batch_keys, batch_queries, batch_values, padding_mask):  # input shape (batch_size, len_k/len_q/len_v, embedding_size)
    
    attention_heads_results = []
    for head in self.attention_heads:
      attention_heads_results.append(head(batch_keys, batch_queries, batch_values, padding_mask))  # shape (batch_size, len_q, embedding_size)
    
    return torch.sum(torch.stack(attention_heads_results), dim=0)  # shape (batch_size, len_q, embedding_size)


# We use the same notation as Vaswani et al meaning that dk are the dimensions in which we project the input to create the keys and values
class TPAttentionHead(LightningModule):
  
  def __init__(self, dk, embedding_size, masked):
    super().__init__()
    
    # Variables Assignment
    self.dk = dk
    self.masked = masked
    self.embedding_size = embedding_size

    # Layers Initialization
    self.Wk = nn.Linear(embedding_size, dk, device=device)  # shape (embedding_size, dk)
    self.Wq = nn.Linear(embedding_size, dk, device=device)  # shape (embedding_size, dk)
    self.Wv = nn.Linear(embedding_size, dk, device=device)  # shape (embedding_size, dk)
    self.Wr = nn.Linear(embedding_size, dk, device=device)  # shape (embedding_size, dk)
    self.Wo = nn.Linear(dk, embedding_size, device=device)  # shape (dk, embedding_size)
  
  def forward(self, batch_keys, batch_queries, batch_values, padding_mask):  # input shape (batch_size, len_k/len_q/len_v, embedding_size)
    
    keys = self.Wk(batch_keys)  # shape (batch_size, len_k, dk)
    queries = self.Wq(batch_queries)  # shape (batch_size, len_q, dk)
    values = self.Wv(batch_values)  # shape (batch_size, len_v, dk)
    roles = self.Wr(batch_queries)  # shape (batch_size, len_q, dk)

    unnormalized_attention_score = torch.matmul(queries, keys.transpose(1, 2)) / math.sqrt(self.dk)  # shape (batch_size, len_q, len_k)
    
    if self.masked:
      input_len_1 = batch_queries.shape[1]
      input_len_2 = batch_keys.shape[1]
      target_mask = torch.triu(torch.full((input_len_1, input_len_2), float("-inf"), device= device), diagonal=1)
      unnormalized_attention_score = unnormalized_attention_score + target_mask
    
    # padding_mask shape (batch_size, len_k)
    padding_mask = padding_mask.unsqueeze(1).repeat(1, batch_queries.shape[1], 1)  # shape (batch_size, len_q, len_k)
    unnormalized_attention_score = unnormalized_attention_score.masked_fill_(padding_mask, float("-inf"))

    attention_score = F.softmax(unnormalized_attention_score, 2)
    filler = torch.matmul(attention_score, values)  # shape (batch_size, len_q, dk)

    return self.Wo(filler * roles)  # Hadamard Product # shape (batch_size, len_q, embedding_size)



In [None]:
class TPDecoder(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, num_layers, p=0.1):
    super().__init__()
    
    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.num_layers = num_layers
    self.dropout_probability = p

    # Layers Initialization
    self.layers = nn.ModuleList(
        [TPDecoderLayer(self.embedding_size, self.num_heads, self.hidden_units, p=self.dropout_probability) for _ in range(self.num_layers)])

  def forward(self, encoder_batch, outputs_batch, src_padding_mask, tgt_padding_mask):
    # encoder_batch shape (batch_size, encoder_len, embedding_size)
    # outputs_batch shape (batch_size, outputs_len, embedding_size)
    
    decoder_layer_ouput = outputs_batch

    for decoder_layer in self.layers:
      decoder_layer_ouput = decoder_layer(encoder_batch, decoder_layer_ouput, src_padding_mask, tgt_padding_mask)  # shape (batch_size, outputs_len, embedding_size)
    
    return decoder_layer_ouput  # shape (batch_size, outputs_len, embedding_size)


class TPDecoderLayer(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.dropout_probability = p

    # Layers Initialization
    self.ffn = TPPositionwiseFFN(self.embedding_size, self.hidden_units)

    self.mha = TPMultiHeadAttention(self.embedding_size, self.num_heads)
    self.masked_mha = TPMultiHeadAttention(self.embedding_size, self.num_heads, masked=True)
    
    self.layer_norm_1 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_2 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_3 = nn.LayerNorm(self.embedding_size, device=device)

    self.dropout = nn.Dropout(p=self.dropout_probability)
  
  def forward(self, encoder_batch, outputs_batch, src_padding_mask, tgt_padding_mask):
    # encoder_batch shape (batch_size, encoder_len, embedding_size)
    # outputs_batch shape (batch_size, outputs_len, embedding_size)

    masked_mha_output = self.masked_mha(outputs_batch, outputs_batch, outputs_batch, tgt_padding_mask)  # shape (batch_size, outputs_len, embedding_size)
    masked_mha_output = self.dropout(masked_mha_output)
    masked_mha_output = self.layer_norm_1(masked_mha_output + outputs_batch)  # add & norm

    mha_output = self.mha(encoder_batch, masked_mha_output, encoder_batch, src_padding_mask)  # shape (batch_size, outputs_len, embedding_size)
    mha_output = self.dropout(mha_output)
    mha_output = self.layer_norm_2(mha_output + masked_mha_output)  # add & norm

    ffn_output = self.ffn(mha_output)  # shape (batch_size, outputs_len, embedding_size)
    ffn_output = self.dropout(ffn_output)
    ffn_output = self.layer_norm_3(ffn_output + mha_output)  # add & norm

    return ffn_output   # shape (batch_size, outputs_len, embedding_size)



In [None]:
class TPEncoder(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, num_layers, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.num_layers = num_layers
    self.dropout_probability = p

    # Layers Initialization
    self.layers = nn.ModuleList(
        [TPEncoderLayer(self.embedding_size, self.num_heads, self.hidden_units, p=self.dropout_probability) for _ in range(self.num_layers)])

  def forward(self, batch, src_padding_mask):  # input shape (batch_size, input_len, embedding_size)

    encoder_layer_output = batch

    for encoder_layer in self.layers:
      encoder_layer_output = encoder_layer(encoder_layer_output, src_padding_mask)  # shape (batch_size, input_len, embedding_size)
    
    return encoder_layer_output  # shape (batch_size, input_len, embedding_size)

class TPEncoderLayer(LightningModule):
  
  def __init__(self, embedding_size, num_heads, hidden_units, p=0.1):
    super().__init__()

    # Variables Assignment
    self.embedding_size = embedding_size
    self.num_heads = num_heads
    self.hidden_units = hidden_units
    self.dropout_probability = p

    # Layers Initialization
    self.ffn = TPPositionwiseFFN(self.embedding_size, self.hidden_units)
    self.mha = TPMultiHeadAttention(self.embedding_size, self.num_heads)
    
    self.layer_norm_1 = nn.LayerNorm(self.embedding_size, device=device)
    self.layer_norm_2 = nn.LayerNorm(self.embedding_size, device=device)

    self.dropout = nn.Dropout(p=self.dropout_probability)
  
  def forward(self, batch, src_padding_mask):  # input shape (batch_size, input_len, embedding_size)

    mha_output = self.mha(batch, batch, batch, src_padding_mask)  # shape (batch_size, input_len, embedding_size)
    mha_output = self.dropout(mha_output)
    mha_output = self.layer_norm_1(mha_output + batch)  # add & norm

    ffn_output = self.ffn(mha_output)  # shape (batch_size, input_len, embedding_size)
    ffn_output = self.dropout(ffn_output)
    ffn_output = self.layer_norm_2(ffn_output + mha_output)  # add & norm

    return ffn_output  # shape (batch_size, input_len, embedding_size)



In [None]:
class TPTransformer(LightningModule):
  
  def __init__(self, dict_size, sizes, dictionaries, 
               embedding_size = 512, hidden_units = 2048, 
               num_heads = 8, num_encoder_layers=6, 
               num_decoder_layers= 6, padding_idx=0, p=0.):
    
    super().__init__()

    # Variables Assignment
    self.dict_size = dict_size
    self.question_max_length = sizes[0]
    self.answer_max_length = sizes[1]
    self.max_len = max(sizes[0], sizes[1])
    self.padding_idx = padding_idx
    self.idx_to_char = dictionaries[1]  # dictionary index to char
    self.dropout_probability = p
    
    self.embedding_size = embedding_size
    self.hidden_units = hidden_units
    self.num_heads = num_heads
    self.num_encoder_layers = num_encoder_layers
    self.num_decoder_layers = num_decoder_layers
    
    # Layers Initialization
    self.total_embedding = TPTotalEmbeddings(self.dict_size, sizes, self.embedding_size, self.padding_idx, p=self.dropout_probability)

    self.encoder = TPEncoder(self.embedding_size, num_heads= self.num_heads, 
                           hidden_units=self.hidden_units, num_layers=self.num_encoder_layers, p=self.dropout_probability)
    
    self.decoder = TPDecoder(self.embedding_size, num_heads= self.num_heads, 
                           hidden_units=self.hidden_units, num_layers=self.num_decoder_layers, p=self.dropout_probability)
    
  def forward(self, batch):
    
    questions = (batch["question"]).to(device)  # shape (batch_size, question_max_length)
    answers = (batch["answer"]).to(device)  # shape (batch_size, answer_max_length-1)

    source_pad_mask = (questions == 0).to(device)  # shape (batch_size, question_max_length)
    target_pad_mask = (answers == 0).to(device)  # shape (batch_size, answer_max_length-1)

    embedded_questions = self.total_embedding(questions)  # shape (batch_size, question_max_length, embedding_size)
    embedded_answers = self.total_embedding(answers)  # shape (batch_size, answer_max_length-1, embedding_size)

    encoder_output = self.encoder(embedded_questions, src_padding_mask=source_pad_mask)  # shape (batch_size, question_max_length, embedding_size)
    decoder_output = self.decoder(encoder_output, embedded_answers, src_padding_mask=source_pad_mask, tgt_padding_mask=target_pad_mask)  # shape (batch_size, answer_max_length-1, embedding_size)

    # Softmax not used because of nn.CrossEntropyLoss
    return torch.matmul(decoder_output, torch.transpose(self.total_embedding.embedding_layer.weight, 0, 1))  # shape (batch_size, answer_max_length-1, dict_size)

  def training_step(self, batch, _):
    # Preparing inputs  
    batch_answers = batch["answer"][:, 1:].flatten(0, 1)  # shape (batch_size * answer_max_length-1)
    batch["answer"] = batch["answer"][:, :-1]  # shape (batch_size, answer_max_length-1)

    # Computing prediction and loss
    pred = self(batch).flatten(0, 1)  # shape (batch_size * answer_max_length-1, dict_size)
    loss = F.cross_entropy(pred, batch_answers, ignore_index=0)
    self.log("train_loss", loss)
    return loss

  def validation_step(self, batch, _):
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy 
    pred = self.predict(batch)  # shape (batch_size, answer_max_length-1, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper

    self.log("val_tot_accuracy", accuracy)  # at the end of every epoch it logs the average of the accuracies of each batch
    return accuracy

  def test_step(self, batch, batch_idx, dataset_idx):
    batch_answers = batch["answer"]  # shape (batch_size, answer_max_length)

    # Computing prediction and accuracy
    pred = self.predict(batch.copy())  # shape (batch_size, answer_max_length-1, dict_size)
    accuracy = paper_accuracy(pred, batch_answers)  # accuracy for the current batch as defined in the "mathematics dataset" paper
    
    if (batch_idx == 0):
      print_correct(self.idx_to_char, batch, pred)

    self.log("test_tot_accuracy", accuracy)  # at the end of every epoch it is logged the average of the accuracies of each batch
    return accuracy

  def predict(self, batch):  # shape (batch_size, question_max_length)
    batch["answer"] = torch.tensor([[1] for j in range(len(batch["question"]))], device=device).long()

    for i in range(self.answer_max_length-1):
      transformer_result = self(batch)
      batch["answer"] = torch.tensor([], device=device).long()

      for j in range(len(transformer_result)):
        predicted_chars = torch.argmax(transformer_result[j], 1)
        
        if len(predicted_chars.shape)==1:
          predicted_chars = predicted_chars.unsqueeze(0)

        start_line_char = torch.tensor([1], device=device).unsqueeze(0).long()
        predicted_chars = torch.cat((start_line_char, predicted_chars), 1)

        batch["answer"] = torch.cat((batch["answer"], predicted_chars), 0)

    return F.one_hot(batch["answer"], num_classes=self.dict_size)  # shape (batch_size, answer_max_length-1, dict_size)
  
  def print_predict(self, questions):
    pred = self.predict(questions.copy())
    print_batch(self.idx_to_char, questions, pred)
    return
  
  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters(), lr=1e-4, betas=(0.9, 0.995))



# Train a model
This section contains the code to train a model.

For this section to work properly, you must run the following sections:
- Imports
- Dataset and Preprocessing
- Model definitions

## LSTM
We chose a subset of the dataset composed by three modules (numbers round, calculus differentiate, polynomials evaluate) for a total of
6 million samples.

We trained this model for 10 epochs using the Adam optimizer, the cross-entropy loss function, a batch size of 1024, a learning rate of 6e−4 and other hyperparameters as suggested in the dataset paper [1].
We obtained an accuracy of 0.560 for the interpolation test set and 0.806 for the extrapolation test set.

In [None]:
# Initialize the dataset
mdm = MathDataModule(chosen_dataset=0, batch_size=256)
mdm.prepare_data()
mdm.setup("fit")

In [None]:
# Initialize the model
model = LSTM(mdm.get_dictionary_size(), mdm.get_max_lengths(), mdm.get_dictionaries())

In [None]:
# Set up logger
wandb.login()
wandb.init(project="DL")
wandb.watch(model, log_freq=10)
wandb_logger = WandbLogger(project="DL")

In [None]:
# Initialize the trainer
trainer = Trainer(max_epochs=10, accelerator="gpu", logger=wandb_logger, log_every_n_steps=10)

# Training the model
trainer.fit(model, train_dataloaders=mdm.train_dataloader(), val_dataloaders=mdm.val_dataloader())

# Stop logging
wandb.finish()

In [None]:
trainer.save_checkpoint("/content/LSTM.ckpt")
# Remember to save also the dictionaries

## Transformer
We chose a subset of the dataset composed by three modules (numbers round, calculus differentiate, polynomials evaluate) for a total of 6 million samples.

We trained this model for 3 epochs using the Adam optimizer, the cross-entropy loss function, a batch size of 512, a learning rate of 1e−4 and other hyperparameters as suggested in the dataset paper [1]. We obtained an accuracy of 0.663 for the interpolation test set and 0.913 for the extrapolation test set.

In [None]:
# Initialize the dataset 
mdm = MathDataModule(chosen_dataset=0, batch_size=256)
mdm.prepare_data()
mdm.setup("fit")

In [None]:
# Initialize the model
model = Transformer(mdm.get_dictionary_size(), mdm.get_max_lengths(), mdm.get_dictionaries(), p=0.)

In [None]:
# Set up logger
wandb.login()
wandb.init(project="DL")
wandb.watch(model, log_freq=10)
wandb_logger = WandbLogger(project="DL")

In [None]:
# Initialize the trainer
trainer = Trainer(max_epochs=3, accelerator="gpu", logger=wandb_logger, log_every_n_steps=10)

# Training the model 
trainer.fit(model, train_dataloaders=mdm.train_dataloader(), val_dataloaders=mdm.val_dataloader())

# Stop logging
wandb.finish()

In [None]:
trainer.save_checkpoint("/content/TRANSFORMER.ckpt")
# Remember to save also the dictionaries

## TP-Transformer
We chose a subset of the dataset composed by three modules (numbers round, calculus differentiate, polynomials evaluate) for a total of 6 million samples.

We trained this model for 3 epochs using the Adam optimizer, the cross-entropy loss function, a batch size of 512, a learning rate of 1e−4 and other hyperparameters as suggested in the SOTA paper [2] (embedding size 256 and dimension of the feedforward networks 2048). We obtained an accuracy of 0.0.684 for the interpolation test set and 0.957 for the extrapolation test set.

In [None]:
# Initialize the dataset 
mdm = MathDataModule(chosen_dataset=0, batch_size=256)
mdm.prepare_data()
mdm.setup("fit")

In [None]:
# Initialize the model
model = TPTransformer(mdm.get_dictionary_size(), mdm.get_max_lengths(), mdm.get_dictionaries(), p=0.)

In [None]:
# Set up logger 
wandb.login()
wandb.init(project="DL")
wandb.watch(model, log_freq=10)
wandb_logger = WandbLogger(project="DL")

In [None]:
# Initialize the trainer
trainer = Trainer(max_epochs=3, accelerator="gpu", logger=wandb_logger, log_every_n_steps=10)

# Training the model
trainer.fit(model, train_dataloaders=mdm.train_dataloader(), val_dataloaders=mdm.val_dataloader())

# Stop logging
wandb.finish()

In [None]:
trainer.save_checkpoint("/content/TPTRANSFORMER.ckpt")
# Remember to save also the dictionaries

# Download pretrained models

In [None]:
!gdown 1k2TzLZIyHJSPO2M8UQFIgcvN2gtLpKVF -O /content/LSTM.ckpt

In [None]:
!gdown 1_Pk9mCA5CvdV68u0saoqKaTBpnTNuEOK -O /content/TRANSFORMER.ckpt

In [None]:
!gdown 1lM9UrljBcFH7ZUS6BfoVLHIF6CtTtLLI -O /content/TPTRANSFORMER.ckpt

#Import a pretrained model and test it
This section contains the code to test a pretrained model.

For this section to work properly, you must run the following sections:
- Imports
- Dataset and Preprocessing
- Model definitions
- Download pretrained models

In [None]:
mdm = MathDataModule(chosen_dataset=4)
mdm.prepare_data()
mdm.question_max_length = 162
mdm.answer_max_length = 32
mdm.char_to_idx = {'&': 0, '#': 1, '@': 2, '{': 3, '"': 4, 'q': 5, 'u': 6, 'e': 7, 's': 8, 't': 9, 'i': 10, 'o': 11, 'n': 12, ':': 13, ' ': 14, 'R': 15, 'd': 16, '-': 17, '0': 18, '.': 19, '7': 20, '1': 21, '4': 22, '5': 23, 'p': 24, ',': 25, 'a': 26, 'w': 27, 'r': 28, '}': 29, '6': 30, '8': 31, '2': 32, 'h': 33, 'W': 34, 'c': 35, 'm': 36, 'l': 37, '?': 38, '9': 39, '3': 40, 'z': 41, 'f': 42, 'v': 43, 'x': 44, 'g': 45, 'L': 46, '(': 47, ')': 48, '=': 49, '*': 50, 'D': 51, '+': 52, 'G': 53, 'b': 54, 'y': 55, 'C': 56, 'j': 57, 'k': 58, 'F': 59}
mdm.idx_to_char = {0: '&', 1: '#', 2: '@', 3: '{', 4: '"', 5: 'q', 6: 'u', 7: 'e', 8: 's', 9: 't', 10: 'i', 11: 'o', 12: 'n', 13: ':', 14: ' ', 15: 'R', 16: 'd', 17: '-', 18: '0', 19: '.', 20: '7', 21: '1', 22: '4', 23: '5', 24: 'p', 25: ',', 26: 'a', 27: 'w', 28: 'r', 29: '}', 30: '6', 31: '8', 32: '2', 33: 'h', 34: 'W', 35: 'c', 36: 'm', 37: 'l', 38: '?', 39: '9', 40: '3', 41: 'z', 42: 'f', 43: 'v', 44: 'x', 45: 'g', 46: 'L', 47: '(', 48: ')', 49: '=', 50: '*', 51: 'D', 52: '+', 53: 'G', 54: 'b', 55: 'y', 56: 'C', 57: 'j', 58: 'k', 59: 'F'}
mdm.setup("test")

In [None]:
#LSTM
model = LSTM.load_from_checkpoint("/content/LSTM.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())
trainer = Trainer(accelerator="gpu")
trainer.test(model, dataloaders=mdm.test_dataloader())

In [None]:
#Transformer
model = Transformer.load_from_checkpoint("/content/TRANSFORMER.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())
trainer = Trainer(accelerator="gpu")
trainer.test(model, dataloaders=mdm.test_dataloader())

In [None]:
#TP-Transformer
model = TPTransformer.load_from_checkpoint("/content/TPTRANSFORMER.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())
trainer = Trainer(accelerator="gpu")
trainer.test(model, dataloaders=mdm.test_dataloader())

# Import a pretrained model and predict
This section contains the code to make a prediction using a pretrained model.

For this section to work properly, you must run the following sections:
- Imports
- Dataset and Preprocessing
- Model definitions
- Download pretrained models

In [None]:
mdm = MathDataModule()
mdm.question_max_length = 162
mdm.answer_max_length = 32
mdm.char_to_idx = {'&': 0, '#': 1, '@': 2, '{': 3, '"': 4, 'q': 5, 'u': 6, 'e': 7, 's': 8, 't': 9, 'i': 10, 'o': 11, 'n': 12, ':': 13, ' ': 14, 'R': 15, 'd': 16, '-': 17, '0': 18, '.': 19, '7': 20, '1': 21, '4': 22, '5': 23, 'p': 24, ',': 25, 'a': 26, 'w': 27, 'r': 28, '}': 29, '6': 30, '8': 31, '2': 32, 'h': 33, 'W': 34, 'c': 35, 'm': 36, 'l': 37, '?': 38, '9': 39, '3': 40, 'z': 41, 'f': 42, 'v': 43, 'x': 44, 'g': 45, 'L': 46, '(': 47, ')': 48, '=': 49, '*': 50, 'D': 51, '+': 52, 'G': 53, 'b': 54, 'y': 55, 'C': 56, 'j': 57, 'k': 58, 'F': 59}
mdm.idx_to_char = {0: '&', 1: '#', 2: '@', 3: '{', 4: '"', 5: 'q', 6: 'u', 7: 'e', 8: 's', 9: 't', 10: 'i', 11: 'o', 12: 'n', 13: ':', 14: ' ', 15: 'R', 16: 'd', 17: '-', 18: '0', 19: '.', 20: '7', 21: '1', 22: '4', 23: '5', 24: 'p', 25: ',', 26: 'a', 27: 'w', 28: 'r', 29: '}', 30: '6', 31: '8', 32: '2', 33: 'h', 34: 'W', 35: 'c', 36: 'm', 37: 'l', 38: '?', 39: '9', 40: '3', 41: 'z', 42: 'f', 43: 'v', 44: 'x', 45: 'g', 46: 'L', 47: '(', 48: ')', 49: '=', 50: '*', 51: 'D', 52: '+', 53: 'G', 54: 'b', 55: 'y', 56: 'C', 57: 'j', 58: 'k', 59: 'F'}

In [None]:
#LSTM
model = LSTM.load_from_checkpoint("/content/LSTM.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())

In [None]:
#Transformer
model = Transformer.load_from_checkpoint("/content/TRANSFORMER.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())

In [None]:
#TP-Transformer
model = TPTransformer.load_from_checkpoint("/content/TPTRANSFORMER.ckpt", dict_size=mdm.get_dictionary_size(), sizes=mdm.get_max_lengths(), dictionaries=mdm.get_dictionaries())

In [None]:
model.eval()
model = model.to(device)

In [None]:
#To test a new question add it to the list
questions = []
questions.append("Let t(x) = -x**2 + 3*x - 3. Calculate t(3).")  # correct: -3
questions.append("What is -0.0006832 rounded to 5 decimal places?")  # correct: -0.00068
questions.append("Find the first derivative of 2*d**4 - 35*d**2 - 695 wrt d.")  # correct: 8*d**3 - 70*d

for question in questions:
  encoded = mdm.encode_question(question)
  model.print_predict(encoded)

#Conclusions
Looking at the experimental results, we found that the Transformer and TP-Transformer models are slower in completing an epoch, but they need fewer of them to reach similar levels of accuracy with respect to the LSTM.
Moreover, as we thought, the TP-Transformer performed better than the other models, both in the Interpolation and the Extrapolation test sets.
However, we believe that our results are highly influenced by the lack of appropriate computational resources. We are pretty sure that we would get even better results if additional training were performed.

#References
[1] David Saxton et al. Analysing Mathematical Reasoning Abilities of Neural Models. 2019. doi: 10.48550/ARXIV.1904.01557. url: https://arxiv.org/abs/1904.01557.

[2] Imanol Schlag et al. Enhancing the Transformer with Explicit Relational Encoding for Math Problem Solving. 2019. doi: 10.48550/ARXIV.1910.06611. url: https://arxiv.org/abs/1910.06611.

[3] Ashish Vaswani et al. Attention Is All You Need. 2017. doi: 10.48550/ARXIV.1706.03762. url: https://arxiv.org/abs/1706.03762.

#Appendix: WandB Report
Launch the code cell to view the WandB report. It contains charts of the training loss and the validation accuracy.

In [None]:
%wandb project_dl/DL/reports/Deep-Learning-Project--VmlldzozNDk3Njc4 -h 720