<a href="https://colab.research.google.com/github/hauduong05/NLP_basics/blob/main/machine_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
cd /content/drive/MyDrive/NMT_data/

/content/drive/MyDrive/NMT_data


In [3]:
import unicodedata
import torch
import torch.nn as nn
import random
import matplotlib.pyplot as plt

In [4]:
pad_token = 0
sos_token = 1
eos_token = 2

class Vocab():
  def __init__(self, name):
    super(Vocab, self).__init__()
    self.name = name
    self.word2idx = {}
    self.word2count = {}
    self.idx2word = {0:'pad', 1:'sos', 2:'eos'}
    self.n_words = 3
    self.max_length = 0
  
  def addSentence(self, sentence):
    words = sentence.split(" ")
    self.max_length = max(self.max_length, len(words))
    for word in words:
      self.addWord(word)

  def addWord(self, word):
    if word not in self.word2idx:
      self.word2idx[word] = self.n_words
      self.word2count[word] = 1
      self.idx2word[self.n_words] = word
      self.n_words += 1
    else:
      self.word2count[word] += 1 

In [5]:
def no_space(char, prev_char):
  return char in ',.;?!' and prev_char != ' '

def read_data(file):
  with open(file, encoding='utf-8') as f:
    return f.read()

def preprocessing(data, pad_token):
  inp, tar = [], []
  data = unicodedata.normalize('NFKD', data)
  data = ''.join([' ' + char if i > 0 and no_space(char, data[i - 1]) else char for i, char in enumerate(data)])
  lines = data.splitlines()
  pairs = [line.split('\t') for line in lines]
  for pair in pairs:
    Eng.addSentence(pair[0])
    Fra.addSentence(pair[1])
  return pairs

In [6]:
class Encoder(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(Encoder, self).__init__()
    self.hidden_size = hidden_size
    self.Embedding = nn.Embedding(input_size, self.hidden_size)
    self.gru = nn.GRU(self.hidden_size, self.hidden_size)
  
  def forward(self, input, hidden):
    output = self.Embedding(input).view(1, 1, -1)
    output, hidden = self.gru(output, hidden)
    return output, hidden

  def init_hidden(self):
    return torch.zeros(1, 1, self.hidden_size)

In [7]:
class Decoder(nn.Module):
  def __init__(self, hidden_size, output_size):
    super(Decoder, self).__init__()
    self.hidden_size = hidden_size
    self.embedding = nn.Embedding(output_size, self.hidden_size)
    self.gru = nn.GRU(self.hidden_size, self.hidden_size)
    self.fc = nn.Linear(self.hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
  
  def forward(self, input, hidden):
    output = self.embedding(input).view(1, 1, -1)
    output, hidden = self.gru(output, hidden)
    output = self.softmax(self.fc(output[0]))
    return output, hidden
  
  def init_hidden(self):
    return torch.zeros(1, 1, self.hidden_size)

In [8]:
def indexfromsentences(vocab, sentence):
  return [vocab.word2idx[word] for word in sentence.split(' ')]

def tensorfromsentence(vocab, sentence):
  idx = indexfromsentences(vocab, sentence)
  idx.append(eos_token)
  return torch.tensor(idx, dtype=torch.long).view(-1, 1)

def tensorfrompair(pair):
  input_tensor = tensorfromsentence(Eng, pair[0])
  output_tensor = tensorfromsentence(Fra, pair[1])
  return (input_tensor, output_tensor)

In [9]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion):
  input_length = input_tensor.size(0)
  target_length = target_tensor.size(0)
  loss = 0

  encoder_optimizer.zero_grad()
  decoder_optimizer.zero_grad()
  
  encoder_hidden = encoder.init_hidden()
  for i in range(input_length):
    encoder_output, encoder_hidden = encoder(input_tensor[i], encoder_hidden)

  decoder_input = torch.tensor([[sos_token]])
  decoder_hidden = encoder_hidden
  for i in range(target_length):
    decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
    loss += criterion(decoder_output, target_tensor[i])
    decoder_input = target_tensor[i]
  
  loss.backward()
  
  encoder_optimizer.step()
  decoder_optimizer.step()

  return loss.item() / target_length

In [10]:
def trainIters(encoder, decoder, n_iters, print_every = 100, plot_every = 100, learning_rate = 0.01):
  print_loss_total = 0
  plot_loss_total = 0
  plot_losses = []

  criterion = nn.NLLLoss()
  encoder_optimizer = torch.optim.SGD(encoder.parameters(), lr=learning_rate)
  decoder_optimizer = torch.optim.SGD(decoder.parameters(), lr=learning_rate)

  training_pairs = [tensorfrompair(random.choice(pairs)) for i in range(n_iters)]
  for iter in range(n_iters):
    training_pair = training_pairs[iter]
    input_tensor = training_pair[0]
    target_tensor = training_pair[1]
    loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
    print_loss_total += loss
    plot_loss_total += loss

    if (iter + 1) % print_every == 0:
      print_loss_avg = print_loss_total / print_every
      print_loss_total = 0
      print(f'iter : {iter+1} loss = {print_loss_avg:.3f}')

    if (iter + 1) % plot_every:
      plot_loss_avg = plot_loss_total / plot_every
      plot_losses.append(plot_loss_avg)
      plot_loss_total = 0

  showPlot(plot_losses)

In [11]:
def showPlot(plot_losses):
  plt.figure()
  plt.plot(plot_losses)
  plt.xlabel('Iteration')
  plt.ylabel('Loss')
  plt.show()

In [12]:
data = read_data('eng-fra.txt')
Eng = Vocab('eng')
Fra = Vocab('fra')
pairs = preprocessing(data, pad_token)
hidden_size = 256

In [None]:
encoder = Encoder(Eng.n_words, hidden_size)
decoder = Decoder(hidden_size, Fra.n_words)
trainIters(encoder, decoder, 50000)

In [1]:
def evaluate(encoder, decoder, sentence, max_length):
  with torch.no_grad():
    input_tensor = tensorfromsentence(Eng, sentence)
    input_length = input_tensor.size()[0]
    encoder_hidden = encoder.init_hidden()
    decoded_word = []

    for i in range(input_length):
      encoder_output, encoder_hidden = encoder(input_tensor[i], encoder_hidden)
    
    decoder_hidden = encoder_hidden
    decoder_input = torch.tensor([[sos_token]])
    
    for i in range(max_length):
      decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
      topv, topi = decoder_output.data.topk(1)
      if topi.item() == eos_token:
        decoded_word.append('<EOS>')
      else:
        decoded_word.append(Fra.idx2word[topi.item()])
      decoder_input = topi.squeeze().detach()
  
  return decoded_words