<a href="https://colab.research.google.com/github/Series-Parallel/Machine_in_Deep_Learning/blob/main/seq2sea_model_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pip
try:
  __import__("lightning")
except ImportError:
  pip.main(['install', "lightning"])

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
import lightning as L

In [None]:
english_token_to_id = {'lets' : 0,
                        'to' : 1,
                        'go' : 2,
                        '<EOS>' : 3}

In [None]:
english_id_to_token = dict(map(reversed, english_token_to_id.items()))

In [None]:
spanish_token_to_id = {'ir' : 0, 'vamos' : 1, 'y' : 2, '<EOS>' : 3}

In [None]:
spanish_id_to_token = dict(map(reversed, spanish_token_to_id.items()))

In [None]:
inputs = torch.tensor([[english_token_to_id['lets'], english_token_to_id['go']],
                      [english_token_to_id['to'], english_token_to_id['go']]])

In [None]:
labels = torch.tensor([[spanish_token_to_id["vamos"],
                        spanish_token_to_id["<EOS>"]],

                       [spanish_token_to_id["ir"],
                        spanish_token_to_id["<EOS>"]]])

In [None]:
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [None]:
class seq2seq(L.LightningModule):

  def __init__(self, max_len=2):

    super().__init__()

    self.max_output_length = max_len

    L.seed_everything(seed=420)


    ## ENCODING

    self.encoder_we = nn.Embedding(num_embeddings=4, embedding_dim=2)

    self.encoder_lstm = nn.LSTM(input_size=2, # input_size = number of inputs (2 numbers per word)
                                hidden_size=2,# hidden_size = number of outputs (2 per word per layer)
                                num_layers=2) # num_layers = how many lstm's to stack
                                              #          If there are 2 layers, then the short term memory from the
                                              #          first layer is used as input to the second layer

    ## DECODING

    self.decoder_we = nn.Embedding(num_embeddings=4,
                                   embedding_dim=2)

    self.decoder_lstm = nn.LSTM(input_size=2,
                                    hidden_size=2,
                                    num_layers=2)

    self.output_fc = nn.Linear(in_features=2,  # in_features = # of outputs per LSTM
                               out_features=4) # out_features = # of words in the output vocabulary

    ## Training

    self.loss = nn.CrossEntropyLoss()

  def forward(self, input, output=None):

    encoder_embeddings = self.encoder_we(input)
    encoder_lstm_output, (encoder_lstm_hidden, encoder_lstm_cell) = self.encoder_lstm(encoder_embeddings)


    decoder_token_id = torch.tensor([spanish_token_to_id["<EOS>"]])
    decoder_embeddings = self.decoder_we(decoder_token_id)

    decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = self.decoder_lstm(decoder_embeddings,
                                                                                          (encoder_lstm_hidden,
                                                                                           encoder_lstm_cell))

    output_values = self.output_fc(decoder_lstm_output)
    outputs = output_values

    predicted_id = torch.tensor([torch.argmax(output_values)])
    predicted_ids = predicted_id

    for i in range(1, self.max_output_length):

      if (output == None):
        if (predicted_id == spanish_token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
                    break
        decoder_embeddings = self.decoder_we(predicted_id)
      else:
        decoder_embeddings = self.decoder_we(torch.tensor([output[i-1]]))
        decoder_lstm_output, (decoder_lstm_hidden, decoder_lstm_cell) = self.decoder_lstm(decoder_embeddings,
                                                                                              (decoder_lstm_hidden,
                                                                                               decoder_lstm_cell))

        output_values = self.output_fc(decoder_lstm_output)
        outputs = torch.cat((outputs, output_values), 0)
        predicted_id = torch.tensor([torch.argmax(output_values)])
        predicted_ids = torch.cat((predicted_ids, predicted_id))

    return(outputs)

  def configure_optimizers(self):
    return Adam(self.parameters(), lr=0.1)

  def training_step(self, batch, batch_idx):
    input_tokens, labels = batch
    output = self.forward(input_tokens[0], labels[0])
    loss = self.loss(output, labels[0])

    return loss

In [None]:
model = seq2seq()

In [None]:
outputs = model.forward(input=torch.tensor([english_token_to_id["lets"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

In [None]:
print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

In [None]:
trainer = L.Trainer(max_epochs=40, accelerator="cpu")
trainer.fit(model, train_dataloaders=dataloader)

In [None]:
outputs = model.forward(input=torch.tensor([english_token_to_id["lets"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

In [None]:
print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

In [None]:
outputs = model.forward(input=torch.tensor([english_token_to_id["to"],
                                            english_token_to_id["go"]]), ## translate "lets go", we should get "vamos <EOS>"
                        output=None)

In [None]:
print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])

In [None]:
## count the number of parameters...
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("Total number of trainable parameters:", total_trainable_params)

In [None]:
## First, save the weights...
trainer.save_checkpoint("seq2seq_en2es_220_trained.ckpt")

In [None]:
## Now let's create a new model and load in the saved weights...
new_model = seq2seq.load_from_checkpoint("seq2seq_en2es_220_trained.ckpt")

outputs = new_model.forward(input=torch.tensor([english_token_to_id["lets"],
                                                english_token_to_id["go"]]),
                            output=None)

print("Translated text:")
predicted_ids = torch.argmax(outputs, dim=1)
for id in predicted_ids:
    print("\t", spanish_id_to_token[id.item()])