<a href="https://colab.research.google.com/github/benschlup/csck507_team_a/blob/main/seq2seq%20from%20tutorial%20-%20adapted.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Text Translation using a RNN

In this tutorial we will learn how to RNNs to build Sequence-2-Sequence (seq-2-seq) model for Neural machine Translation. We will build LSTM-based seq-2-seq model for translating German sentences into English sentences. 

Let us import the necessary libraries

1. Pytorch for using LSTM layer
2. Spacy for text processing

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas
import spacy
from spacy.lang.en import English
from spacy.lang.de import German
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from tqdm import tqdm_notebook
import random
from collections import Counter
 

In [None]:
!fusermount -u drive
!google-drive-ocamlfuse drive

fusermount: failed to unmount /content/drive: No such file or directory
/bin/bash: google-drive-ocamlfuse: command not found


In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
root_path = "/content/gdrive/Shareddrives/Colab_Notebooks/CSCK507/W6_Seq2SeqTutorial_Data/"

The file "data/deu.txt" (in the data folder) consists of parallel sentences in French and English, which we will use to train our seq-2-seq model. The function "prepare_data" process the data file to obtain token representions required for the seq-2-seq model.

In [None]:
def prepare_data(root_path, file_name, train_samples):
  # Reading the English-German sentences pairs from the file
  with open(root_path+file_name, "r+", encoding='utf-8') as file:
    deu = [x[:-1] for x in file.readlines()]
  en = []
  de = []
  # get english/french sentence pairs
  for line in deu:
    en.append(line.split("\t")[0])
    de.append(line.split("\t")[1])
  
    # Setting the number of training sentences we'll use
  training_examples = train_samples
  # We'll be using the spaCy's English and German tokenizers
  spacy_en = English()
  spacy_de = German()
  
  en_words = Counter()
  de_words = Counter()
  en_inputs = []
  de_inputs = []
  
  # Tokenizing the English and German sentences and creating our word banks for both languages
  for i in range(training_examples):
      en_tokens = spacy_en(en[i])
      de_tokens = spacy_de(de[i])
      if len(en_tokens)==0 or len(de_tokens)==0:
          continue
      for token in en_tokens:
          en_words.update([token.text.lower()])
      en_inputs.append([token.text.lower() for token in en_tokens] + ['_EOS'])
      for token in de_tokens:
          de_words.update([token.text.lower()])
      de_inputs.append([token.text.lower() for token in de_tokens] + ['_EOS'])
    
  # Assigning an index to each word token, including the Start Of String(SOS), End Of String(EOS) and Unknown(UNK) tokens
  en_words = ['_SOS','_EOS','_UNK'] + sorted(en_words,key=en_words.get,reverse=True)
  en_w2i = {o:i for i,o in enumerate(en_words)}
  en_i2w = {i:o for i,o in enumerate(en_words)}
  de_words = ['_SOS','_EOS','_UNK'] + sorted(de_words,key=de_words.get,reverse=True)
  de_w2i = {o:i for i,o in enumerate(de_words)}
  de_i2w = {i:o for i,o in enumerate(de_words)}
  
  # Converting our English and German sentences to their token indexes
  for i in range(len(en_inputs)):
      en_sentence = en_inputs[i]
      de_sentence = de_inputs[i]
      en_inputs[i] = [en_w2i[word] for word in en_sentence]
      de_inputs[i] = [de_w2i[word] for word in de_sentence]
  
  return en_words, de_words, en_w2i, en_i2w, de_w2i, de_i2w, en_inputs, de_inputs


In [None]:
en_words, de_words, en_w2i, en_i2w, de_w2i, de_i2w, en_inputs, de_inputs = prepare_data(root_path, "data/deu.txt", 10000)

en_inputs[0], de_inputs[0]

([22, 3, 1], [65, 3, 1])

Let's write our Encoder Class

In [None]:
class EncoderLSTM(nn.Module):
  def __init__(self, vocab_len, input_dim, hidden_dim, n_layers=1, drop_prob=0):
    super(EncoderLSTM, self).__init__()

    self.input_dim = input_dim
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
 
    self.embedding = nn.Embedding(vocab_len, input_dim)
    self.lstm = nn.LSTM(input_dim, hidden_dim, n_layers, 
                        dropout=drop_prob, batch_first=True)
 
  def forward(self, inputs, encoder_state_vector, encoder_cell_vector):
    embedded = self.embedding(inputs)
    # Pass the embedded word vectors into LSTM and return all outputs
    output, hidden = self.lstm(embedded, (encoder_state_vector, encoder_cell_vector))
    return output, hidden
 
  def init_hidden(self, batch_size=1):
    return (torch.zeros(self.n_layers, batch_size, 
                        self.hidden_dim),
            torch.zeros(self.n_layers, batch_size, 
                        self.hidden_dim))


Let's write our Decoder class

In [None]:
class DecoderLSTM(nn.Module):
  def __init__(self, input_dim, hidden_dim, output_vocab_len, n_layers=1, drop_prob=0.1):
    super(DecoderLSTM, self).__init__()
    self.hidden_dim = hidden_dim
    self.output_vocab_len = output_vocab_len
    self.n_layers = n_layers
    self.drop_prob = drop_prob
    self.input_dim = input_dim
 
    self.embedding = nn.Embedding(self.output_vocab_len, self.input_dim)
    self.dropout = nn.Dropout(self.drop_prob) 
    self.lstm = nn.LSTM(self.input_dim, self.hidden_dim, batch_first=True)
    self.classifier = nn.Linear(self.hidden_dim, self.output_vocab_len)

  def forward(self, inputs, decoder_state_vector, decoder_context_vector):
    # Embed input words
    embedded = self.embedding(inputs).view(1, -1)
    embedded = self.dropout(embedded)
    embedded = embedded.unsqueeze(0)
    
    output, hidden = self.lstm(embedded, (decoder_state_vector, 
                                          decoder_context_vector))

    # Pass LSTM outputs through a Linear layer acting as a classifier
    output = F.log_softmax(self.classifier(output[0]), dim=1)

    return output, hidden



Let's train our model and save the trained model to the "model" directory.

In [None]:
input_dim = 100
hidden_dim = 256

encoder = EncoderLSTM(len(en_words), input_dim, hidden_dim)
decoder = DecoderLSTM(input_dim, hidden_dim, len(de_words))
 
lr = 0.001
encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr)

EPOCHS = 10
teacher_forcing_prob = 0.5
encoder.train()
decoder.train()
tk0 = range(1,EPOCHS+1)
for epoch in tk0:
    avg_loss = 0.
    tk1 = enumerate(en_inputs)

    for i, sentence in tk1:

        loss = 0.

        #initialise encoder state vector and cell state vector
        h = encoder.init_hidden()
        encoder_state_vector = h[0]
        encoder_cell_vector = h[0]

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        inp = torch.tensor(sentence).unsqueeze(0)

        #print('inp: ', epoch, inp)
        if (i % 100) == 0:
          print('inp: ', i, epoch)
        encoder_outputs, h = encoder(inp, encoder_state_vector, encoder_cell_vector)
        
        #First decoder input will be the SOS token
        decoder_input = torch.tensor([en_w2i['_SOS']])
        #First decoder hidden state will be last encoder hidden state
        decoder_hidden = h

        output = []
        teacher_forcing = True if random.random() < teacher_forcing_prob else False

        for ii in range(len(de_inputs[i])):
          decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden[0], decoder_hidden[1])

          # Get the index value of the word with the highest score from the decoder output
          top_value, top_index = decoder_output.topk(1)
          if teacher_forcing:
            decoder_input = torch.tensor([de_inputs[i][ii]])
          else:
            decoder_input = torch.tensor([top_index.item()])
            
          output.append(top_index.item())
          # Calculate the loss of the prediction against the actual word
          loss += F.nll_loss(decoder_output.view(1,-1), torch.tensor([de_inputs[i][ii]]))

        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()
        avg_loss += loss.item()/len(en_inputs)
    print(avg_loss)

# Save model after every epoch (Optional)
torch.save({"encoder":encoder.state_dict(),"decoder":decoder.state_dict(),"e_optimizer":encoder_optimizer.state_dict(),"d_optimizer":decoder_optimizer},root_path+"model/model_enc_dec.pt")


inp:  0 1
inp:  100 1
inp:  200 1
inp:  300 1
inp:  400 1
inp:  500 1
inp:  600 1
inp:  700 1
inp:  800 1
inp:  900 1
inp:  1000 1
inp:  1100 1
inp:  1200 1
inp:  1300 1
inp:  1400 1
inp:  1500 1
inp:  1600 1
inp:  1700 1
inp:  1800 1
inp:  1900 1
inp:  2000 1
inp:  2100 1
inp:  2200 1
inp:  2300 1
inp:  2400 1
inp:  2500 1
inp:  2600 1
inp:  2700 1
inp:  2800 1
inp:  2900 1
inp:  3000 1
inp:  3100 1
inp:  3200 1
inp:  3300 1
inp:  3400 1
inp:  3500 1
inp:  3600 1
inp:  3700 1
inp:  3800 1
inp:  3900 1
inp:  4000 1
inp:  4100 1
inp:  4200 1
inp:  4300 1
inp:  4400 1
inp:  4500 1
inp:  4600 1
inp:  4700 1
inp:  4800 1
inp:  4900 1
inp:  5000 1
inp:  5100 1
inp:  5200 1
inp:  5300 1
inp:  5400 1
inp:  5500 1
inp:  5600 1
inp:  5700 1
inp:  5800 1
inp:  5900 1
inp:  6000 1
inp:  6100 1
inp:  6200 1
inp:  6300 1
inp:  6400 1
inp:  6500 1
inp:  6600 1
inp:  6700 1
inp:  6800 1
inp:  6900 1
inp:  7000 1
inp:  7100 1
inp:  7200 1
inp:  7300 1
inp:  7400 1
inp:  7500 1
inp:  7600 1
inp:  7700 

Use the pretrained model to check translation for some random sentences in the corpus.

In [None]:
checkpoint = torch.load(root_path+"model/model_enc_dec.pt")

encoder.load_state_dict(checkpoint['encoder'])
decoder.load_state_dict(checkpoint['decoder'])
encoder_optimizer.load_state_dict(checkpoint['e_optimizer'])
#decoder_optimizer.load_state_dict(checkpoint['d_optimizer'])

encoder.eval()
decoder.eval()

# get some random numbers to choose random sentences
rand_integers = [random.randint(0, 10000) for i in range(1, 20)]

for i in rand_integers:
  h = encoder.init_hidden()
  inp = torch.tensor(en_inputs[i]).unsqueeze(0)
  encoder_outputs, h = encoder(inp, h[0], h[1])
   
  decoder_input = torch.tensor([en_w2i['_SOS']])
  decoder_hidden = h
  output = []
  attentions = []
  while True:
    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden[0], decoder_hidden[1])
    _, top_index = decoder_output.topk(1)
    decoder_input = torch.tensor([top_index.item()])
    # If the decoder output is the End Of Sentence token, stop decoding process
    if top_index.item() == de_w2i["_EOS"]:
      break
    output.append(top_index.item())
  
  print("English: "+ " ".join([en_i2w[x] for x in en_inputs[i]]))
  print("Predicted: " + " ".join([de_i2w[x] for x in output]))
  print("Actual: " + " ".join([de_i2w[x] for x in de_inputs[i]]))
  print()
   


English: i want a lot . _EOS
Predicted: ich möchte viel .
Actual: ich möchte viel . _EOS

English: life is unfair . _EOS
Predicted: das leben ist unfair .
Actual: das leben ist unfair . _EOS

English: tom broke it . _EOS
Predicted: tom hat es ruiniert . gegeben .
Actual: tom hat es kaputt gemacht . _EOS

English: you can rest . _EOS
Predicted: du können dich ausruhen .
Actual: du kannst ausruhen . _EOS

English: is it a deer ? _EOS
Predicted: ist es ein geheimnis ?
Actual: ist es ein hirsch ? _EOS

English: take tom . _EOS
Predicted: nimm tom um .
Actual: nimm tom . _EOS

English: you 're lost . _EOS
Predicted: ihr habt euch verirrt .
Actual: du hast dich verirrt . _EOS

English: life is fun . _EOS
Predicted: das leben macht spaß .
Actual: das leben macht spaß . _EOS

English: i like women . _EOS
Predicted: ich mag gern .
Actual: ich mag frauen . _EOS

English: i know my job . _EOS
Predicted: ich kenne mich in meinem beruf .
Actual: ich kenne mich in meinem beruf aus . _EOS

English: i