In [2]:
from transformers import T5Tokenizer, T5ForConditionalGeneration
import pytorch_lightning as pl
from sklearn.model_selection import train_test_split
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers.tensorboard import TensorBoardLogger
from torch.utils.data import Dataset
import pandas as pd
import torch.nn as nn
import torch
from torch.utils.data import DataLoader

train_dataset_name = 'ArithOpsTrain.xlsx'
df = pd.read_excel(train_dataset_name)
df = df.drop('Table 1',axis=1)
df = df.rename(columns=df.iloc[0]).loc[1:]

train_df , valid_df = train_test_split(df,test_size=0.1,random_state=0)

In [3]:
train_df.head()

Unnamed: 0,Description,Question,Equation,Input Numbers,Output
373,mrs. hilt is baking bread . she needs number0 ...,how much flour will she need to make number2 l...,/ number0 number1,5 2 1,2.5
901,robin had number0 songs on her number1 player ...,how many songs does she have on her number4 pl...,+ - number0 number2 number3,30 3 8 10 3,32.0
254,there are number0 more sections that are undev...,what is the total area of the undeveloped land ?,* number0 number1,3 2435,7305.0
468,mom made number0 chocolate chip cookies . it t...,how many cookies were left ?,- number0 number3,32 24 16 9,23.0
333,number0 children are taking a bus to the zoo ....,how many seats will the children need in all ?,/ number0 number1,58 2,29.0


In [None]:
class T5Dataset(Dataset):
    def __init__(
        self,
        data  : pd.DataFrame,
        tokenizer : T5Tokenizer,
        text_max_token_length = 512,
        output_max_token_length = 128
    ):
        
        super().__init__()
        self.tokenizer = tokenizer
        self.data = data 
        self.text_max_token_length = text_max_token_length,
        self.output_max_token_length = output_max_token_length
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        
        data_row = self.data.iloc[index]

        input_text = data_row["Description"]
        input_question = data_row["Question"]

        in_text = input_text + " [SEP] " + input_question
        
        input_text_encoding = self.tokenizer(
            in_text,
            max_length=self.text_max_token_length,
            padding = "max_length",
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
        )

        

        output_text = data_row["Equation"]        
        output_text_encoding = self.tokenizer(
            output_text,
            max_length=self.output_max_token_length,
            padding = "max_length",
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
        )

        return dict(
            input_text = input_text,
            output_text = output_text,
            input_text_ids = input_text_encoding['input_ids'].flatten(),
            input_attention_mask = input_text_encoding['attention_mask'].flatten(),
            output_text_ids = output_text_encoding['input_ids'].flatten(),
            output_attention_mask = output_text_encoding['attention_mask'].flatten()
        )   

In [None]:
t5_tokenizer = T5Tokenizer.from_pretrained("t5-small")
special_tokens_dict = {'additional_special_tokens' : ['[SEP]']}
num_added_tokens = t5_tokenizer.add_special_tokens(special_tokens_dict)

t5_model =T5ForConditionalGeneration.from_pretrained("t5-small")
dataset = T5Dataset(train_df,t5_tokenizer)

In [None]:
train_dataset = T5Dataset(train_df,t5_tokenizer)
valid_dataset = T5Dataset(valid_df,t5_tokenizer)

train_dataloader = DataLoader(train_dataset,32,True)
valid_dataloader = DataLoader(valid_dataset,32,shuffle=False)

In [None]:
from torch.utils.data import DataLoader

class T5DataSetModule(pl.LightningDataModule):

    def __init__(
        self,
        train_df : pd.DataFrame,
        valid_df : pd.DataFrame,
        tokenizer : T5Tokenizer,
        batch_size = 32,
        text_max_token_length = 512,
        output_max_token_length = 128
    ):
        super().__init__()
        self.train_df = train_df
        self.valid_df = valid_df
        self.tokenizer = tokenizer,
        self.batch_size = batch_size,
        self.text_max_token_length = text_max_token_length
        self.output_max_token_length = output_max_token_length

    
    def setup(self, stage = None):
        
        self.train_dataset = T5Dataset(self.train_df,self.tokenizer,self.text_max_token_length,self.output_max_token_length)
        self.valid_dataset = T5Dataset(self.valid_df,self.tokenizer,self.text_max_token_length,self.output_max_token_length)
    
    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            self.batch_size,
            shuffle=True,
        )
    
    def val_dataloader(self):
        return DataLoader(
            self.valid_dataset,
            self.batch_size,
            shuffle=False
        )
#data_module = T5DataSetModule(train_df,valid_df,t5_tokenizer,BATCH_SIZE)       

In [None]:
def postfix_evaluation(batch_data,input_values):

    arith_symbols = set(['+','-','*','/','%'])
    output_values = []
    
    for i in range(len(batch_data)):
        flag = True
        current_input = batch_data[i].split(' ')
        current_input.reverse()
        input_value = input_values[i]

        stack = []
        for symbol in current_input:
            if symbol in arith_symbols:
                if len(stack)<2:
                    flag = False
                    break
                in1 = stack.pop(-1)
                in2 = stack.pop(-1)

                res = 0
                if symbol=='+':
                    res = in1+in2
                elif symbol=='-':
                    res = in1 - in2 
                elif symbol == '*':
                    res = in1 * in2
                elif symbol=='/':
                    res = in1/in2
                else:
                    res = in1 % in2
                stack.append(res)


            else:
                if "number" in symbol:
                    index = int(symbol[6])
                    stack.append(input_value[index])

        if flag==False or len(stack)!=1:
            output_values.append(0)
        else:
            output_values.append(stack.pop(-1))

    ans = torch.tensor(output_values)
    return ans

ans = postfix_evaluation(["+ - number0 number1 number2","+ / - number0 number2 number1 number3"],[[1,4,6],[5,6,7,8]])

In [None]:
import math

class PositionalEncoding(nn.Module):

    def __init__(self,dim_model,dropout_p,max_len) -> None:
        super().__init__()
        self.dropout =  nn.Dropout(dropout_p)

        pos_encoding = torch.zeros(max_len,dim_model)
        
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) 
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) 
        
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding",pos_encoding)

        
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:

        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])

In [None]:
class TransformerModel(nn.Module):

    def __init__(
        self,
        num_tokens_input,
        num_tokens_output,
        dim_model,
        num_heads,
        num_encoder_layers,
        num_decoder_layers,
        dim_feedforward,
        dropout_p
    ):
        super().__init__()

        self.positional_encoder = PositionalEncoding(
            dim_model=dim_model,
            dropout_p= dropout_p,
            max_len=5000
        )

        self.src_embedding = nn.Embedding(num_tokens_input,dim_model)
        self.trg_embedding = nn.Embedding(num_tokens_output,dim_model)

        self.dim_model = dim_model

        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout= dropout_p,
            batch_first=True
        )

        self.out = nn.Linear(self.dim_model,num_tokens_output)

    
    def forward(self, src, trg, src_padding_mask=None,target_mask=None, target_padding_mask=None):

        src = self.src_embedding(src) * math.sqrt(self.dim_model)
        target = self.trg_embedding(trg) * math.sqrt(self.dim_model)

        src = self.positional_encoder(src)
        trg = self.positional_encoder(trg)
        

        transformer_out = self.transformer(
            src,trg,tgt_mask=target_mask,
            src_key_padding_mask=src_padding_mask,
            trg_key_padding_mask=target_padding_mask
        )
        out = self.out(transformer_out)
        return out
    
        
    def get_tgt_mask(self,size):
        
        mask = torch.tril(torch.ones(size,size) == 1)
        mask = mask.float()
        mask = mask.masked_fill(mask==0,float('-inf'))
        mask = mask.masked_fill(mask==1,float(0.0))

    def get_padding_mask(self,matrix,pad_token):
        return (matrix==pad_token)

In [None]:
import torch.optim as optim
class TranformerTranslator(pl.LightningModule):

    def __init__(
        self,
        num_tokens_input,
        num_tokens_output,
        dim_model,
        num_heads,
        num_encoder_layers,
        num_decoder_layers,
        dim_feedforward,
        dropout_p
    ):
        
        super().__init__()
        self.transformer = TransformerModel(
                num_tokens_input=num_tokens_input,
                num_tokens_output=num_tokens_output,
                dim_model=dim_model,
                num_heads=num_heads,
                num_encoder_layers=num_encoder_layers,
                num_decoder_layers=num_decoder_layers,
                dim_feedforward= dim_feedforward,
                dropout_p=dropout_p
            )

        self.loss_fn = nn.CrossEntropyLoss()


        
    def forward(self, src, trg, src_padding_mask=None,target_mask=None, target_padding_mask=None):

        return self.transformer(src,trg,src_padding_mask,target_mask,target_padding_mask)
        

    def training_step(self, batch_data):

        input_text_ids = batch_data['input_text_ids']
        input_attention_mask = batch_data['input_attention_mask']
        output_text_ids = batch_data['output_text_ids']
        output_attention_mask = batch_data['output_attention_mask']

        
        output_in = output_text_ids[:,:-1]
        output_expected = output_text_ids[:,1:]
        
        target_mask = self.transformer.get_tgt_mask(output_expected.shape(1))

        predictions = self(input_text_ids,output_in,target_mask=target_mask)

        train_loss = self.loss_fn(predictions,output_expected)
        
        self.log("train_loss" , train_loss, prog_bar=True,logger=True)

        return train_loss
        
    def validation_step(self,batch_data):
        
        input_text_ids = batch_data['input_text_ids']
        input_attention_mask = batch_data['input_attention_mask']
        output_text_ids = batch_data['output_text_ids']
        output_attention_mask = batch_data['output_attention_mask']

        
        output_in = output_text_ids[:,:-1]
        output_expected = output_text_ids[:,1:]
        
        target_mask = self.transformer.get_tgt_mask(output_expected.shape(1))

        predictions = self(input_text_ids,output_in,target_mask=target_mask)

        valid_loss = self.loss_fn(predictions,output_expected)
        
        self.log("valid_loss" , valid_loss, prog_bar=True,logger=True)

        return valid_loss
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(),lr = 0.0001)


In [None]:
class T5ArithTranslator(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.t5_model = T5ForConditionalGeneration.from_pretrained("t5-small")


    def forward(self, input_ids, input_attention_mask, decoder_attention_mask, labels):

        outs = self.t5_model(input_ids=input_ids,attention_mask = input_attention_mask,labels = labels)        
        return outs.loss ,  outs.logits

        
    def training_step(self, batch, batch_idx) :
        
        input_text_ids = batch["input_text_ids"]
        input_attention_mask = batch["input_attention_mask"]
        output_text_ids = batch["output_text_ids"]
        output_attention_mask = batch["output_attention_mask"]

        loss, outs = self(
            input_text_ids,
            input_attention_mask,
            output_attention_mask,
            output_text_ids
        )

        self.log("train_loss" , loss, prog_bar=True,logger=True)
        return loss 
    
    def validation_step(self, batch, batch_idx):
        
        input_text_ids = batch["input_text_ids"]
        input_attention_mask = batch["input_attention_mask"]
        output_text_ids = batch["output_text_ids"]
        output_attention_mask = batch["output_attention_mask"]

        loss, outs = self(
            input_text_ids,
            input_attention_mask,
            output_attention_mask,
            output_text_ids
        )

        self.log("valid_loss" , loss, prog_bar=True,logger=True)
        return loss 

    def configure_optimizers(self):
        return optim.Adam(self.parameters(),lr = 0.0001)


In [None]:
N_EPOCHS = 50
BATCH_SIZE = 32

model = T5ArithTranslator()

checkpoint_callback = ModelCheckpoint(
    dirpath = "checkpoints",
    filename="transformer-scratch-best-checkpoint",
    save_top_k = 1,
    verbose = True,
    monitor="valid_loss",
    mode = "min"
)

logger = TensorBoardLogger("lightning_logs",name="translator")

trainer = pl.Trainer(
    logger = logger,
    callbacks =  checkpoint_callback,
    max_epochs=N_EPOCHS,
    log_every_n_steps=5,
    gpus=1,
    accelerator='cpu'
)

In [None]:
trainer.fit(model,train_dataloader,valid_dataloader)

### Inference Model

In [None]:
test_model = T5ArithTranslator.load_from_checkpoint(
    '/Users/depressedcoder/DLNLP/Assignment5/partb/checkpoints/best-checkpoint-v1.ckpt'
)
test_model.freeze()

t5_tokenizer = T5Tokenizer.from_pretrained("t5-small")
special_tokens_dict = {'additional_special_tokens' : ['[SEP]']}
num_added_tokens = t5_tokenizer.add_special_tokens(special_tokens_dict)


In [None]:
test_input_ids = t5_tokenizer("last stop in their field trip was the aquarium . penny identified number0 species of sharks number1 species of eels and number2 different species of whales . [SEP] how many species was penny able to identify ?",return_tensors='pt').input_ids

In [None]:
outputs = test_model.t5_model.generate(test_input_ids)
text = t5_tokenizer.decode(outputs[0], skip_special_tokens=True)
print(text.split(' '))