### Transformers

In [None]:
! pip install torchtext==0.6.0

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

! pip install -U spacy
! python -m spacy download fr_core_news_sm
! python -m spacy download en_core_web_sm

In [1]:
import torch
import torchtext

import numpy as np
import pandas as pd
import spacy

import matplotlib.pyplot as plt
import IPython.display as display
import random

In [None]:
if torch.backends.mps.is_available():
    DEVICE = torch.device(device='mps')
elif torch.cuda.is_available():
    DEVICE = torch.device(device='cuda')
else:
    DEVICE = torch.device(device='cpu')

In [2]:
DEVICE = torch.device(device='cpu')

### Preprocessing

In [3]:
spacy_en = spacy.load('en_core_web_sm')
spacy_fr = spacy.load('fr_core_news_sm')

In [4]:
french = torchtext.data.Field(tokenize= lambda text: [token.text for token in spacy_en.tokenizer(text)],
                              lower = True,
                              init_token = '<sos>',
                              eos_token = '<eos>')

english = torchtext.data.Field(tokenize= lambda text: [token.text for token in spacy_fr.tokenizer(text)],
                              lower = True,
                              init_token = '<sos>',
                              eos_token = '<eos>')

In [7]:
fields = {
    'English': ('eng', english),
    'French': ('fre', french)

}

train_data, test_data = torchtext.data.TabularDataset.splits(
    path='data/',
    train='train_25.csv',
    test = 'test_25.csv',
    format='csv',
    fields=fields
)

In [8]:
english.build_vocab(train_data, max_size = 10000, min_freq = 2)
french.build_vocab(train_data, max_size = 10000, min_freq = 2)

In [9]:
batch_size = 64
train_iterator, test_iterator = torchtext.data.BucketIterator.splits(
    (train_data, test_data),
    batch_size = batch_size,
    sort_within_batch = True, # Protizes to have examples are of similar length in a batch, because it reduces padding and save compute.
    sort_key = lambda x: len(getattr(x, 'eng')), # Protizes to have examples are of similar length in a batch, because it reduces padding and save compute.
    device=DEVICE
)

### Learning Loops

In [10]:
pad_idx = french.vocab.stoi['<pad>']
criterion = torch.nn.CrossEntropyLoss(ignore_index=pad_idx)

In [11]:
def one_epoch_train(model: torch.nn.Module, data_loader_train: torch.utils.data.DataLoader,
                    loss_criterion: torch.nn, optim_alog: torch.optim) -> tuple:
    """Function that trains the model for one epoch.

    Args:
        model (torch.nn.Module): Pytorch model we want to train.
        data_loader_train (torch.utils.data.DataLoader): Pytorch dataloader that carries training data.
        loss_criterion (torch.nn): Pytorch loss criteria on which we calculate loss.
        optim_alog (torch.optim): Opimiztion algoritham that we use to update model weights.

    Returns:
        tuple: Tuple carrying Train loss and accuracy
    """
    batch_loss_train = []
    batch_counter = 0
    for batch in data_loader_train:
        input_text = batch.eng.to(DEVICE)
        target_text = batch.fre.to(DEVICE)


        # Enabling model training.
        model.train(True)


        #Setting gradients to zero to prevent gradient accumulation.
        optim_alog.zero_grad()

        # Forward pass.
        y_pred_prob = model(input_text, target_text)

        y_pred_prob = y_pred_prob[1:].reshape(-1, y_pred_prob.shape[2])
        target_text = target_text[1:].reshape(-1)

        loss = loss_criterion(y_pred_prob, target_text)

        batch_loss_train.append(loss.item())

        # Back Propagation
        loss.backward()

        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        # Updating weights
        optim_alog.step()
        
        batch_counter += 1

        del(input_text)
        del(target_text)

    return sum(batch_loss_train)/batch_counter

In [12]:
def inference(model: torch.nn.Module, data_loader_val: torch.utils.data.DataLoader, loss_criterion: torch.nn) -> tuple:
    """Function that calculates test accuracy

    Args:
        model (torch.nn.Module): Pytorch model we want to make inference on.
        data_loader_val (torch.utils.data.DataLoader): Pytorch dataloader that carries validation data.
        loss_criterion (torch.nn): Pytorch loss criteria on which we calculate loss.

    Returns:
        tuple: Tuple carrying Test loss and accuracy
    """

    batch_loss_train = []
    batch_counter = 0

    for batch in data_loader_val:
        input_text = batch.eng.to(DEVICE)
        target_text = batch.fre.to(DEVICE)

        # Disabiling model training.
        model.train(False)

        with torch.inference_mode():
            # Forward Pass
            y_pred_prob = model(input_text, target_text)

            y_pred_prob = y_pred_prob[1:].reshape(-1, y_pred_prob.shape[2])
            target_text = target_text[1:].reshape(-1)

            # Calculating Loss
            loss = loss_criterion(y_pred_prob, target_text)
            batch_loss_train.append(loss.item())

        batch_counter += 1

        del(input_text)
        del(target_text)

    return sum(batch_loss_train)/batch_counter

In [13]:
def training_loop(model: torch.nn.Module, data_loader_train: torch.utils.data.DataLoader, data_loader_val: torch.utils.data.DataLoader,
                  epochs:int, loss_criterion: torch.nn, optim_alog: torch.optim)-> dict:
    """Function that trains the model for the given number of epochs

    Args:
        model (torch.nn.Module): Pytorch model we want to train.
        data_loader_train (torch.utils.data.DataLoader): Pytorch dataloader that carries training data.
        data_loader_val (torch.utils.data.DataLoader): Pytorch dataloader that carries validation data.
        epochs (int): Count of EPOCHS
        loss_criterion (torch.nn): Pytorch loss criteria on which we calculate loss.
        optim_alog (torch.optim): Opimiztion algoritham that we use to update model weights.

    Returns:
        dict: A dictionary that carries the output metrics.
    """

    loss_train = []
    loss_val = []

    # Loop that iterates over each EPOCH
    for epoch in range(epochs):

        #Train the model for one EPOCH
        epoch_loss = one_epoch_train(model, data_loader_train, loss_criterion, optim_alog)
        loss_train.append(epoch_loss)

        # Caluclating Testing results
        val_loss = inference(model, data_loader_val, loss_criterion)
        loss_val.append(val_loss)

        if (epoch+1)%1 == 0:
            print('For Epoch {} We Train Loss:{}, Val Loss:{}'.format(epoch+1, epoch_loss,val_loss))
    return {'training_loss':loss_train, 'val_loss':loss_val}

In [14]:
def plot_metrics(epochs: int,metrics: dict) -> None:
    """Plot the graphs of Training and Testing Accuracy and Loss across Epoches

    Args:
        epochs (int): Number of Epochs
        metrics (dict): A dictionary containing Test and Training datasets' Loss and accuracy
    """

    plt.figure(figsize=(10,10))
    plt.plot(list(range(epochs)), metrics['training_loss'])
    plt.plot(list(range(epochs)), metrics['val_loss'])
    plt.grid()
    plt.legend(['Train', 'Test'])
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Train and Validation loss across epochs')

    plt.show()

In [16]:
def sentence_translator(model: torch.nn.Module, sentence: str, src_corpus: torchtext.data.Field, tgt_corpus: torchtext.data.Field) -> list:
    """Given the model and english sentence it will translate the english sentence to french.

    Args:
        model (torch.nn.Model): Pytorch Model
        sentence (str): English sentence
        src_corpus (torchtext.data.Field): English Corpus (Source Torchtext data field)
        tgt_corpus (torchtext.data.Field): French Corpus (Destination Torchtext data field)

    Returns:
        list: List of words 
    """
    
    # Checking If the sentence is string or not.
    if isinstance(sentence, str):
        tokens = [token.text.lower() for token in spacy_en(sentence)]
    else:
        tokens = [token.lower() for token in sentence]
        
    # Attaching <SOS> token at the beginning of the source sentence
    tokens.insert(0, src_corpus.init_token)
    
    # Attaching <EOS> token at the end of the source sentence
    tokens.append(src_corpus.eos_token)
    
    # Converting the soruce text to sentence vector 
    sentence_vector = [src_corpus.vocab.stoi[token] for token in tokens]
    
    # Creating a tensor from the vector
    sentence_tensor = torch.LongTensor(sentence_vector).unsqueeze(1).to(DEVICE)
    
    # Performing encoding
    with torch.inference_mode():
        hidden_state, cell_state = model.encoder(sentence_tensor)
        
    # Attaching <SOS> token at the beginning of the destination sentence. 
    outputs = [tgt_corpus.vocab.stoi["<sos>"]]
    
    # Iteration over and producing the sequnce of words of the translated sentence. 
    for _ in range(20):
        
        # Getting previous word to pass it to decoder
        previous_word = torch.LongTensor([outputs[-1]]).to(DEVICE)
        
        # Performing Decoding
        with torch.inference_mode():
            output, hidden_state, cell_state = model.decoder(previous_word, hidden_state, cell_state)
            # Predicting the word
            word_pred = torch.argmax(output, axis=1).item()
            
        # Append the Predicted word 
        outputs.append(word_pred)
        
        # On reaching end of the sentence break the loop
        if torch.argmax(output, axis=1).item() == tgt_corpus.vocab.stoi["<eos>"]:
            break
    
    # Converting translated sentence vector to translated sentence.
    translated_sentence = [tgt_corpus.vocab.itos[idx] for idx in outputs]

    return translated_sentence[1:]

In [17]:
def score_bleu(dataset: torchtext.data.TabularDataset.splits, model: torch.nn.Module, src_corpus: torchtext.data.Field, tgt_corpus: torchtext.data.Field) -> int:
    """ Get BLeU score of the given dataset and its translation.

    Args:
        dataset (torchtext.data.TabularDataset.splits): torch text data set
        model (torch.nn.Model): pytorch model
        src_corpus (torchtext.data.Field): English Corpus (Source Torchtext data field)
        tgt_corpus (torchtext.data.Field): French Corpus (Destination Torchtext data field)

    Returns:
        int: Bleu score of the translation.
    """
    targets = []
    outputs = []

    for translation_record in dataset:
        source_sentence = vars(translation_record)["eng"]
        target_sentence = vars(translation_record)["fre"]

        translated_sentence = sentence_translator(model, source_sentence, src_corpus, tgt_corpus)
        translated_sentence = translated_sentence[:-1]

        targets.append([target_sentence])
        outputs.append(translated_sentence)

    return torchtext.data.metrics.bleu_score(outputs, targets)

## Model Building

#### Transformers

In [42]:
class Transformer(torch.nn.Module):
    def __init__(self, embedding_size, src_vocab_size, target_vocab_size, src_pad_idx, num_heads, num_encoder_layers,
                 num_decoder_layers, forward_expansion, dropout, max_len):

        super(Transformer, self).__init__()
        self.src_word_embedding = torch.nn.Embedding(src_vocab_size, embedding_size)
        # Since Transformers are permuationally invariant
        self.src_position_embedding = torch.nn.Embedding(max_len, embedding_size)

        self.trg_word_embedding = torch.nn.Embedding(target_vocab_size, embedding_size)
        self.trg_position_embedding = torch.nn.Embedding(max_len, embedding_size)


        self.transformer = torch.nn.Transformer(embedding_size, num_heads, num_encoder_layers, num_decoder_layers,
                                                forward_expansion, dropout)
        self.linear = torch.nn.Linear(embedding_size, target_vocab_size)
        self.dropout = torch.nn.Dropout(dropout)

        self.src_pad_idx = src_pad_idx

    def make_src_mask(self, src):
        # Src shape: (src_len, N) -> By pyrotch implementation takes it as opposit (N, Src_len) hence transpose
        src_mask = src.transpose(0,1) == self.src_pad_idx
        return src_mask.to(DEVICE)

    def forward(self, src, trg):
        src_seq_length, N = src.shape
        trg_seq_length, N = trg.shape
        
        src_positions = (
            torch.arange(0, src_seq_length).unsqueeze(1).expand(src_seq_length, N).to(DEVICE)
            )

        trg_positions = (
            torch.arange(0, trg_seq_length).unsqueeze(1).expand(trg_seq_length, N).to(DEVICE)
            )

        embed_src = self.dropout(
            (self.src_word_embedding(src)+self.src_position_embedding(src_positions))
        )

        embed_trg = self.dropout(
            (self.trg_word_embedding(trg)+self.trg_position_embedding(trg_positions))
        )

        src_padding_mask = self.make_src_mask(src)
        
        trg_mask = self.transformer.generate_square_subsequent_mask(trg_seq_length).to(DEVICE)
        
        print(src_padding_mask.shape)
        print(trg_mask.shape)
        
        print(embed_src.shape)
        print(embed_trg.shape)

        out = self.transformer(embed_src, embed_trg, src_padding_mask, trg_mask)
        print('ZZZZZ')

        out = self.linear(out)

        return out

In [43]:
trans_model = Transformer(
    embedding_size = 512,
    src_vocab_size= len(english.vocab), 
    target_vocab_size= len(french.vocab), 
    src_pad_idx= english.vocab.stoi["<pad>"], 
    num_heads=8, 
    num_encoder_layers=3,
    num_decoder_layers=3, 
    forward_expansion=2048, 
    dropout=0.10, 
    max_len=25
).to(DEVICE)

In [44]:
EPOCHS = 20
optimizer = torch.optim.Adam(trans_model.parameters(), lr=3e-4)

In [45]:
output_metrics = training_loop(model = trans_model, data_loader_train = train_iterator, data_loader_val = test_iterator,
                               epochs = EPOCHS, loss_criterion = criterion, optim_alog = optimizer)

torch.Size([64, 5])
torch.Size([9, 9])
torch.Size([5, 64, 512])
torch.Size([9, 64, 512])


RuntimeError: The shape of the 2D attn_mask is torch.Size([64, 5]), but should be (5, 5).

In [None]:
plot_metrics(EPOCHS, output_metrics)

In [None]:
print('Train BLEU Score:{0}'.format(score_bleu(train_data, trans_model, english, french)))

In [None]:
print('Test BLEU Score:{0}'.format(score_bleu(test_data, trans_model, english, french)))