<a href="https://colab.research.google.com/github/aaronjoel/DeepUnderstandingOfDeepLearning/blob/main/chapter_10_seq2seq_lstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The StatQuest Illustrated Guide to Neural Networks and AI

## Chapter 10 - Seq2Seq and Encoder-Decoder Models with LSTMs

Copyright 2024, Joshua Starmer

In this notebook, we will build and train a Seq2Seq or Encoder-Decoder model with 2 layers of LSTMs, each layer with 2 stacks of LSTMs as seen in the picture below.

In [1]:
%%capture
!pip install lightning

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

from torch.utils.data import TensorDataset, DataLoader

import lightning as L

## Create the datasets that we will use for training Encoder-Decoder model

To make the model at least a little bit interesting, we will translate two english phrases, **Let's go** and **to go** into spanish. **Let's go** should translate to **vamos \<EOS\>** and **to go** should translate to **ir \<EOS\>**.

In [5]:
# first, we create a dictionary that maps vocabulary tokens to id numbers...
english_token_to_id = {
    'lets': 0,
    'to': 1,
    'go': 2,
    '<EOS>': 3  ### <EOS> = end of sequence
}
print(english_token_to_id)

## ...then we create a dictionary that maps the ids to tokens. This will help us interpret the output.
## We use the "map()" function to apply the "reversed()" function to each tuple (i.e. ('lets', 0)) stored
## in the token_to_id dictionary. We then use dict() to make a new dictionary from the
## reversed tuples.
english_id_to_token = dict(map(reversed, english_token_to_id.items()))
print(english_id_to_token)

spanish_token_to_id = {
    'ir': 0,
    'vamos': 1,
    'y': 2,
    '<EOS>': 3
}

print(spanish_token_to_id)

spanish_id_to_token = dict(map(reversed, spanish_token_to_id.items()))
print(spanish_id_to_token)

inputs = torch.tensor([[english_token_to_id['lets'],
                        english_token_to_id['go']],

                       [english_token_to_id['to'],
                       english_token_to_id['go']]])

labels = torch.tensor([[spanish_token_to_id['vamos'],
                        spanish_token_to_id['<EOS>']],

                       [spanish_token_to_id['ir'],
                       spanish_token_to_id['<EOS>']]])

{'lets': 0, 'to': 1, 'go': 2, '<EOS>': 3}
{0: 'lets', 1: 'to', 2: 'go', 3: '<EOS>'}
{'ir': 0, 'vamos': 1, 'y': 2, '<EOS>': 3}
{0: 'ir', 1: 'vamos', 2: 'y', 3: '<EOS>'}


Now that we have created the data that we want to train the embeddings with we'll store it in a `DataLoader`. Since our dataset is so small, using a `DataLoader` is a little bit of an overkill, but it is easy to do, and it will allow us to easily scale up to a much larger vocabulary when the time comes.

In [6]:
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [7]:
class seq2seq(L.LightningModule):

    def __init__(self, max_len=2):

        super().__init__()

        self.max_output_length = max_len

        L.seed_everything(seed=420)

        #################################
        ##
        ## ENCODING
        ##
        #################################
        self.encoder_we = nn.Embedding(num_embeddings=4, # num_embeddings = # of words in input vocabulary
                                       embedding_dim=2)  # embedding_dim = 2 numbers per embedding

        self.encoder_lstm = nn.LSTM(input_size=2, # input_size = number of inputs (2 numbers per word)
                                    hidden_size=2,# hidden_size = number of outputs (2 per word per layer)
                                    num_layers=2) # num_layers = how many lstm's to stack
                                                  #          If there are 2 layers, then the short term memory from the
                                                  #          first layer is used as input to the second layer

        #################################
        ##
        ## DECODING
        ##
        #################################
        self.decoder_we = nn.Embedding(num_embeddings=4,
                                       embedding_dim=2)

        self.decoder_lstm = nn.LSTM(input_size=2,
                                    hidden_size=2,
                                    num_layers=2)

        self.output_fc = nn.Linear(in_features=2,  # in_features = # of outputs per LSTM
                                   out_features=4) # out_features = # of words in the output vocabulary

        #################################
        ##
        ## Training
        ##
        #################################
        self.loss = nn.CrossEntropyLoss()


    def forward(self, input, output=None):

        #################################
        ##
        ## ENCODING
        ##
        #################################
        ## first, use the encoder stage to create an intermediate encoding of the input text
        encoder_embeddings = self.encoder_we(input)
        encoder_lstm_output, (encoder_lstm_hidden, encoder_lstm_cell) = self.encoder_lstm(encoder_embeddings)

        #################################
        ##
        ## DECODING
        ##
        #################################
        ## We start by initializing the decoder with the <EOS> token...
        decoder_token_id = torch.tensor([spanish_token_to_id["<EOS>"]])
        decoder_embeddings = self.decoder_we(decoder_token_id)

        decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = self.decoder_lstm(decoder_embeddings,
                                                                                          (encoder_lstm_hidden,
                                                                                           encoder_lstm_cell))

        output_values = self.output_fc(decoder_lstm_output)
        outputs = output_values

        predicted_id = torch.tensor([torch.argmax(output_values)])
        predicted_ids = predicted_id

        for i in range(1, self.max_output_length):

            if (output == None): # using the model...
                if (predicted_id == spanish_token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
                    break
                decoder_embeddings = self.decoder_we(predicted_id)
            else:
                ## run this when training the model
                decoder_embeddings = self.decoder_we(torch.tensor([output[i-1]]))

            decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = self.decoder_lstm(decoder_embeddings,
                                                                                              (decoder_lstm_hidden,
                                                                                               decoder_lstm_cell))

            output_values = self.output_fc(decoder_lstm_output)
            outputs = torch.cat((outputs, output_values), 0)
            predicted_id = torch.tensor([torch.argmax(output_values)])
            predicted_ids = torch.cat((predicted_ids, predicted_id))

        return(outputs)


    def configure_optimizers(self): # this configures the optimizer we want to use for backpropagation.
        return Adam(self.parameters(), lr=0.1) ## NOTE: Setting the learning rate to 0.1 trains way faster than
                                               ## using the default learning rate, lr=0.001


    def training_step(self, batch, batch_idx): # take a step during gradient descent.
        input_tokens, labels = batch # collect input
        output = self.forward(input_tokens[0], labels[0]) # run input through the neural network
        loss = self.loss(output, labels[0]) ## self.loss = cross entropy
        ###################
        ##
        ## Logging the loss
        ##
        ###################
        # self.log("train_loss", loss)

        return loss


In [8]:
model = seq2seq()
outputs = model.forward(input=torch.tensor([english_token_to_id["lets"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

INFO: Seed set to 420
INFO:lightning.fabric.utilities.seed:Seed set to 420


Translated text:
	 <EOS>


In [9]:
trainer = L.Trainer(max_epochs=40, accelerator='cpu')
trainer.fit(model, train_dataloaders=dataloader)

INFO: Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
INFO:lightning.pytorch.utilities.rank_zero:Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: 
  | Name         | Type             | Params | Mode 
----------------------------------------------------------
0 | encoder_we   | Embedding        | 8      | train
1 | encoder_lstm | LSTM             | 96     | train
2 | decoder_we   | Embedding        | 8  

Training: |          | 0/? [00:00<?, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=40` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=40` reached.


In [10]:
outputs = model.forward(input=torch.tensor([english_token_to_id["lets"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Translated text:
	 vamos
	 <EOS>


In [11]:
outputs = model.forward(input=torch.tensor([english_token_to_id["to"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

Translated text:
	 ir
	 <EOS>


In [12]:
## count the number of parameters...
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("Total number of trainable parameters:", total_trainable_params)

Total number of trainable parameters: 220


In [13]:
## First, save the weights...
trainer.save_checkpoint("seq2seq_en2es_220_trained.ckpt") ## NOTE: You can specify a path as part of the filename

In [14]:
## Now let's create a new model and load in the saved weights...
new_model = seq2seq.load_from_checkpoint("seq2seq_en2es_220_trained.ckpt")

outputs = new_model.forward(input=torch.tensor([english_token_to_id["lets"],
                                                english_token_to_id["go"]]),
                            output=None)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

INFO: Seed set to 420
INFO:lightning.fabric.utilities.seed:Seed set to 420


Translated text:
	 vamos
	 <EOS>
