<a href="https://colab.research.google.com/github/archyyu/translation-from-RNN-to-transformer/blob/main/machine_translation_by_GRU_with_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import requests
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
import matplotlib.pyplot as plt # for making figures
%matplotlib inline

# Set random seed for reproducibility
torch.manual_seed(42)

<torch._C.Generator at 0x7948ec7a8210>

In [2]:
hidden_size = 100
embedding_dim = 30
learning_rate = 0.001
batch_size = 50
beam_width = 3

In [20]:
url = "https://raw.githubusercontent.com/archyyu/publicResource/main/eng-fra.txt"
response = requests.get(url)
lines = response.text.split('\n')
en_lines = []
fr_lines = []

start_character = '<'
end_character = '>'
padding_character = '&'

for i in range(20000,30000):
  item = lines[i].split('\t')
  en_lines.append(item[0] + '>')
  fr_lines.append(item[1] + '>')

max_len_line_en = max([len(l) for l in en_lines])
max_len_line_fr = max([len(l) for l in fr_lines])

for i in range(len(en_lines)):
  if (len(en_lines[i]) < max_len_line_en):
    en_lines[i] = en_lines[i].ljust(max_len_line_en, padding_character)
  if (len(fr_lines[i]) < max_len_line_fr):
    fr_lines[i] = fr_lines[i].ljust(max_len_line_fr, padding_character)


source_vocab = sorted(set(''.join(en_lines)))
target_vocab = sorted(set(''.join(fr_lines)))

target_vocab.append('<')

source_vocab_size = len(set(''.join(source_vocab)))
target_vocab_size = len(set(''.join(target_vocab)))

source_char_to_ix = {ch: i for i, ch in enumerate(source_vocab)}
source_ix_to_char = {i: ch for i, ch in enumerate(source_vocab)}

target_char_to_ix = {ch: i for i, ch in enumerate(target_vocab)}
target_ix_to_char = {i: ch for i, ch in enumerate(target_vocab)}

padding_token_index = target_char_to_ix[padding_character]

In [21]:
def line_to_tensor(line):
  result = []
  line_ten = torch.tensor([source_char_to_ix[ch] for ch in test_line], dtype=torch.long).view(1, -1)
  result.append(line_ten)
  return torch.cat(result, dim=0)

def target_line_to_tensor(line):
  result = []
  line_ten = torch.tensor([target_char_to_ix[ch] for ch in test_line], dtype=torch.long).view(1, -1)
  result.append(line_ten)
  return torch.cat(result, dim=0)

en_data = []
fr_data = []
for i in range(len(en_lines)):
  e = torch.tensor([source_char_to_ix[ch] for ch in en_lines[i]], dtype=torch.long).view(1, -1)
  en_data.append(e)
  f = torch.tensor([target_char_to_ix[ch] for ch in fr_lines[i]], dtype=torch.long).view(1, -1)
  fr_data.append(f)

en_data = torch.cat(en_data, dim=0)
fr_data = torch.cat(fr_data, dim=0)

In [22]:
class Attention(nn.Module):
  def __init__(self, hidden_size):
    super(Attention, self).__init__()
    self.attn = nn.Linear(hidden_size * 2, hidden_size)
    self.v = nn.Parameter(torch.rand(hidden_size))

  def forward(self, hidden, encoder_outputs):
    seq_len = encoder_outputs.size(1)
    hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)
    energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=-1)))
    attention_scores = torch.matmul(energy, self.v)
    attention_weights = torch.softmax(attention_scores, dim=1)
    context_vector = torch.sum(attention_weights.unsqueeze(2) * encoder_outputs, dim=1)
    return context_vector

class Encoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_size):
    super(Encoder, self).__init__()
    self.hidden_size = hidden_size
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    self.Wr = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hr = nn.Linear(hidden_size, hidden_size,bias=False)
    self.Wz = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hz = nn.Linear(hidden_size, hidden_size,bias=False)
    self.Wh = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hh = nn.Linear(hidden_size, hidden_size,bias=False)
    self.rb = nn.Parameter(torch.zeros(1, hidden_size))
    self.zb = nn.Parameter(torch.zeros(1, hidden_size))
    self.hb = nn.Parameter(torch.zeros(1, hidden_size))

  def forward(self, x):
    h_prev = torch.zeros(1, self.hidden_size)
    h_list = []
    for i in range(x.shape[1]):
      t = self.embedding(x[:,i])
      rt = torch.sigmoid(self.Wr(t) + self.Hr(h_prev) + self.rb)
      zt = torch.sigmoid(self.Wz(t) + self.Hz(h_prev) + self.zb)

      tht = torch.tanh(self.Wh(t) + rt * self.Hh(h_prev) + self.hb)
      h_prev = zt * tht + (1 - zt) * h_prev
      h_list.append(h_prev)
    return torch.stack(h_list, dim=0)



class Decoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_size):
    super(Decoder, self).__init__()
    self.hidden_size = hidden_size
    self.embedding_dim = embedding_dim
    self.embedding = nn.Embedding(vocab_size, self.embedding_dim)

    self.Wr = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hr = nn.Linear(hidden_size, hidden_size,bias=False)
    self.Wz = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hz = nn.Linear(hidden_size, hidden_size,bias=False)
    self.Wh = nn.Linear(embedding_dim, hidden_size, bias=False)
    self.Hh = nn.Linear(hidden_size, hidden_size,bias=False)
    self.rb = nn.Parameter(torch.zeros(1, hidden_size))
    self.zb = nn.Parameter(torch.zeros(1, hidden_size))
    self.hb = nn.Parameter(torch.zeros(1, hidden_size))
    self.ob = nn.Parameter(torch.zeros(1, vocab_size))
    self.Ho = nn.Linear(hidden_size, vocab_size)
    self.e2d = nn.Linear(self.hidden_size, vocab_size, bias=False)
    self.att = Attention(hidden_size)

  def init_state(self, encode_state):
    T,B,C = encode_state.shape
    self.encode_state = encode_state.reshape(B,T,C)


  def forward(self, batch_size):

    # if x is None:
    h_prev = torch.zeros(batch_size, self.hidden_size)
    x = torch.tensor([target_char_to_ix[start_character] for _ in range(batch_size)],dtype=torch.long)
    output = []
    for i in range(max_len_line_fr):
      t = self.embedding(x)

      rt = torch.sigmoid(self.Wr(t) + self.Hr(h_prev) + self.rb)
      zt = torch.sigmoid(self.Wz(t) + self.Hz(h_prev) + self.zb)

      tht = torch.tanh(self.Wh(t) + rt * self.Hh(h_prev) + self.hb)
      h_prev = zt * tht + (1 - zt) * h_prev

      context_state = self.att(h_prev, self.encode_state)
      y = self.e2d(context_state) + self.Ho(h_prev) + self.ob
      p = nn.functional.softmax(y, dim=1)
      ix = torch.argmax(p, dim=-1)
      x = ix
      output.append(y)
    return torch.stack(output, dim=0).permute(1, 0, 2)

  def beam_search(self):
    """
    Perform beam search to generate sequences.
    """
    beams = [(torch.tensor([target_char_to_ix[start_character]], dtype=torch.long), 1.0)]
    h = torch.zeros(1, self.hidden_size)

    for i in range(max_len_line_fr):
      new_beams = []

      for seq, score in beams:
        x = seq[-1].view(1, -1)  # Take the last predicted token

        t = self.embedding(x)
        rt = torch.sigmoid(self.Wr(t) + self.Hr(h) + self.rb)
        zt = torch.sigmoid(self.Wz(t) + self.Hz(h) + self.zb)

        tht = torch.tanh(self.Wh(t) + rt * self.Hh(h) + self.hb)
        h = zt * tht + (1 - zt) * h

        context_state = self.att(h.reshape(-1, self.hidden_size), self.encode_state)
        y = self.e2d(context_state) + self.Ho(h) + self.ob
        p = F.softmax(y, dim=-1)
        top_probs, top_ix = torch.topk(p, beam_width, dim=-1)

        for prob, token_ix in zip(top_probs[0][0], top_ix[0][0]):
          new_seq = torch.cat((seq, torch.tensor([token_ix], dtype=torch.long)), dim=0)
          new_beams.append((new_seq, score * prob.item()))

      beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]

    return beams

class Seq2Seq(nn.Module):
  def __init__(self, source_vocab_size, target_vocab_size, embedding_dim, hidden_size):
    super(Seq2Seq, self).__init__()
    self.embedding_dim = embedding_dim
    self.hidden_size = hidden_size
    self.encoder = Encoder(source_vocab_size, self.embedding_dim, self.hidden_size)
    self.decoder = Decoder(target_vocab_size, self.embedding_dim, self.hidden_size)
  def forward(self, source, batch_size):
    hidden_state = self.encoder(source)
    self.decoder.init_state(hidden_state)
    output = self.decoder(batch_size)
    return output

  def translate(self, source):
    hidden_state = self.encoder(source)
    self.decoder.init_state(hidden_state)
    beams = self.decoder.beam_search()
    return beams


# Define your model, loss function, and optimizer
model = Seq2Seq(source_vocab_size, target_vocab_size, embedding_dim, hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
#training
import torch.optim as optim

num_epochs = 30

# Training loop
for epoch in range(num_epochs):
  for p in range(0,len(en_data) - batch_size - 1,batch_size):

    source_batch = en_data[p:p+batch_size]
    target_batch = fr_data[p:p+batch_size]

    optimizer.zero_grad()
    # encoder = Encoder(source_vocab_size, embedding_dim, hidden_size)
    # output = encoder(source_batch)
    output = model(source_batch, batch_size)

    output = output.reshape(-1, target_vocab_size)
    # remove the padding tokens when calculate the loss
    # Create a mask to ignore padding tokens
    padding_mask = (target_batch != padding_token_index).float()

    # Compute the loss with the padding mask
    loss = criterion(output, target_batch.view(-1))
    loss = (loss * padding_mask.view(-1)).sum() / padding_mask.sum()

    loss.backward()
    for param in model.parameters():
      if param.grad is not None:
        param.grad.data.clamp_(-5, 5)
    optimizer.step()

    if p%500 == 0:
      # Print or log the training loss for each epoch
      print(f'p {p}, Loss: {loss.item()}')



p 0, Loss: 4.638576984405518
p 500, Loss: 3.0082247257232666
p 1000, Loss: 2.1656718254089355
p 1500, Loss: 1.8153971433639526
p 2000, Loss: 2.1199541091918945
p 2500, Loss: 1.8754996061325073
p 3000, Loss: 1.7304545640945435
p 3500, Loss: 1.7717962265014648
p 4000, Loss: 1.4608933925628662
p 4500, Loss: 1.6876193284988403
p 5000, Loss: 1.6519733667373657
p 5500, Loss: 1.2705917358398438


In [19]:
test_line = "I feel gidd>&"

input = line_to_tensor(test_line)

outputs = model.translate(input)
for tensor,p in outputs:
  result = [target_ix_to_char[j.item()] for j in tensor]
  print(''.join(result))

# outputs = model(input,1)
# result = []
# for i in range(outputs.shape[0]):

#   p = nn.functional.softmax(outputs[i], dim=-1).detach().numpy().ravel()
#   ix = np.random.choice(range(target_vocab_size), p=p)

#   result.append(target_ix_to_char[ix])

# print(''.join(result))

<Je  e&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
<Je    &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
<Je  s&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&


In [None]:
en_lines

I changed the RNN to GRU
and train it again to check the result