<a href="https://colab.research.google.com/github/akkasi/EnglishToGerman-Translator/blob/master/BasicModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install  torchtext==0.9.0

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.legacy.datasets import Multi30k
from torchtext.legacy.data import Field, BucketIterator
from torch.utils.tensorboard import SummaryWriter
import spacy
import numpy as np
import random
import math
import time
from tqdm import tqdm


## Reproducibility

In [None]:
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True

## Functions to Prepare Data

In [None]:
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

def tokenize_de(text):

  ''' Tokenize the German text into tokens. It can be replaced by any other type of tokenization.
  '''

  return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):

  ''' Tokenize the English text into tokens. It can be replaced by any other type of tokenization.
  '''

  return [tok.text for tok in spacy_de.tokenizer(text)]

## Data Preparation

In [None]:
Source = Field(sequential=True, tokenize=tokenize_de,
               init_token='<SOS>',eos_token='<EOS>',
               use_vocab=True,unk_token='<unk>',
               lower=True)
Target = Field(sequential=True, tokenize=tokenize_en,
               init_token='<SOS>',eos_token='<EOS>',
               use_vocab=True,unk_token='<unk>',
               lower=True)
train, validation, test = Multi30k.splits(exts=('.en','.de'),fields=(Source, Target)) ## To change the source and target, it is enough to change the order of extensions

## Data Investigation and Build the Vocabularies

In [None]:
print(f'Number of training samples: {len(train.examples)}')
print(f'Number of validation samples: {len(validation.examples)}')
print(f'Number of test samples: {len(test.examples)}')

print(train.examples[0])
print(vars(train.examples[0]))

Source.build_vocab(train,min_freq=2,max_size=10000)
Target.build_vocab(train, min_freq=2,max_size=10000)

print(f'Number of unique tokens in Germany is: {len(Source.vocab)}')
print(f'Number of unique tokens in English is: {len(Target.vocab)}')



## Hyperparameters and Data Loaders

In [None]:
from unicodedata import bidirectional
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 128
input_dim = len(Source.vocab)
output_dim = len(Target.vocab)
enc_embd_dim = 300
dec_embd_dim = 300
hid_dim = 1024
n_layers = 2
enc_dropout_p = 0.5
dec_dropout_p = 0.5
learning_rate = 0.001
n_epochs = 20
clip = 1
bidirectional = False
writer = SummaryWriter(f'runs/loss_plot')
sytep = 0
train_iter, validation_iter, test_iter = BucketIterator.splits((train,validation,test),
                                                               batch_size=batch_size,
                                                               sort_within_batch = True,
                                                               sort_key = lambda x:len(x.src),
                                                               device=device)


## Model Definition

In [None]:
class Encoder(nn.Module):
  def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout_p, bidirectional):
    super().__init__()

    self.hid_dim = hid_dim
    self.n_layers = n_layers

    self.embedding = nn.Embedding(input_dim,emb_dim)

    self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout_p, bidirectional=bidirectional)

    self.dropout = nn.Dropout(dropout_p)

  def forward(self,x):

    # x shape = (x len, batch_size)

    embedded = self.dropout(self.embedding(x))

    #embeded sahpe = (x len, batch_size, emb_dim)

    outputs, (hidden,cell) = self.rnn(embedded)

    # outputs shape = (x len, batch_size, hid_dim*bidirections)
    #hidden shape = (n_layers * directions, batch_size, hid_dim)
    #cell shape = (n_layers * directions, batch_size, hid_dim)

    return hidden, cell

class Decoder(nn.Module):

  def __init__(self, output_dim, embd_dim, hid_dim, n_layers, dropout_p):
      super().__init__()

      self.output_dim = output_dim
      self.hid_dim = hid_dim
      self.n_layers = n_layers
      
      self.embeddig = nn.Embedding(output_dim, embd_dim)

      self.rnn = nn.LSTM(embd_dim, hid_dim, n_layers, dropout=dropout_p)

      self.fc_out = nn.Linear(hid_dim,output_dim) # if hid_dim *2 if bidirection = True

      self.dropout = nn.Dropout(dropout_p)
    

  def forward(self, input, hidden, cell):

    # input shape = (batch_size)
    #hidden shape = (n_layers * directions, batch_size, hid_dim)
    #cell shape = (n_layers * directions, batch_size, hid_dim)

    #In decoder we cannot use bidirectional rnn so that:
    #hidden shape = (n_layuers, batch_size, hid_dim)
    #cell shape = (n_layuers, batch_size, hid_dim)

    input = input.unsqueeze(0)
    #input shape = (1, batch_size)

    embedded = self.dropout(self.embeddig(input))
    #embedded shape = (1, batch_size, embd_dim)

    output,(hidden,cell) = self.rnn(embedded, (hidden,cell))
    #output shape = (1,seq len, batch_size, hid_dim* directions)
    #hidden shape = (n_layers*directions, batch_size, hid_dim)
    #cell shape = (n_layers*directions, batch_size, hid_dim)
    # In decoder we must use one directional LSTM

    prediction = self.fc_out(output.squeeze(0))
    #prediction shape = (batch_size, output_dim)

    return prediction, hidden, cell

class BasicSeq2Seq(nn.Module):
  def __init__(self,encoder, decoder):
    super().__init__()

    self.encoder = encoder
    self.decoder = decoder

    assert encoder.hid_dim == decoder.hid_dim, "Hidden dimensions of encoder and decoder must be same!"
    assert encoder.n_layers == decoder.n_layers, "Encoder and decoder must have the same number of layers!"

  def forward(self,source,target, teacher_forcing_ratio = 0.5):
    #source shape = (source len, batch_size)
    #traget shape = (target len, batch_size)

    batch_size = target.shape[1]
    target_len = target.shape[0]
    target_vocab_size = self.decoder.output_dim
    
    #tensor to store decoder outputs
    outputs = torch.zeros(target_len,batch_size,target_vocab_size).to(device)

    #last hidden state of the encoder is used as the initial hidden stae of the decoder
    hidden, cell = self.encoder(source)

    #first input to the decoder is the <sos> token
    input = target[0,:]

    for t in range(1, target_len):

      #insert input token embedding, previous hidden and previous cell state
      # receive output tensor (predictions) and new hidden and cell states

      output, hidden, cell = self.decoder(input, hidden, cell)

      #place predictions in a tensor holding predictions for each token
      outputs[t] = output

      #decide if we are going to use teacher forcing or not
      teacher_force = random.random() < teacher_forcing_ratio

      #get the highest predicted token from our predictions

      top = output.argmax(1)

      #if teacher forcing, use actrual next token as next input
      #if not, use predicted token
      input = target[t] if teacher_force else top
    return outputs

In [None]:
!nvidia-smi

## Model Training

In [None]:
encoder = Encoder(input_dim, enc_embd_dim, hid_dim, n_layers, enc_dropout_p, bidirectional).to(device)
decoder = Decoder(output_dim,dec_embd_dim,hid_dim,n_layers,dec_dropout_p).to(device)

model = BasicSeq2Seq(encoder, decoder).to(device)

### Weight Initialization

In [None]:
def init_weight(m):
  for name,param in m.named_parameters():
    nn.init.uniform_(param.data, -0.05,0.05)
model.apply(init_weight)

#### Number of trainable parameters in model

In [None]:
def count_parameters(model):
  return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model)}, trainable parameters.')

### **Optimizer and Loss functions**

In [None]:
optimizer = optim.Adam(model.parameters(),lr=learning_rate)
Target_pad_index = Target.vocab.stoi[Target.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index=Target_pad_index)

### **Train and Evaluation functions**

In [None]:
def train(model,iterator, optimizer, criterion,clip):

  model.train()
  epoch_loss = 0

  for i,batch in enumerate(iterator):
    source = batch.src 
    target = batch.trg 

    optimizer.zero_grad()

    output = model(source,target)

    # target shape = (target len, batch_size)
    # output shape = (target len, batch_size, output_dim)
    output_dim = output.shape[-1]

    output = output[1:].view(-1, output_dim)
    target = target[1:].view(-1)

    #targe shape = (target len -1 * batch size)
    #output shape = (target len -1 * batch_size, poutput_dim)

    loss = criterion(output,target)

    loss.backward()

    nn.utils.clip_grad_norm_(model.parameters(),clip)

    epoch_loss += loss.item()
    writer.add_scalar('Training loss',loss,global_step=step)
    step +=1
    return epoch_loss/ len(iterator)

def evaluate(model, iterator, criterion):
  model.eval()

  epoch_loss = 0

  with torch.no_grad():
    for i, batch in enumerate(iterator):
      source = batch.src
      target = batch.trg

      output = model(source, target)

      output_dim = output.shape[-1]

      output = output[1:].view(-1,output_dim)
      target = target[1:].view(-1)

      loss = criterion(output,target)

      epoch_loss += loss.item()

      return epoch_loss/len(iterator)


def translate_sentence(model, sentence, source, target, device, max_lenght=50):
  model.eval()
  if type(sentence) == str:
    tokens = [tok.text.lower() for tok in spacy_en(sentence)]
  else:
    tokens = [token.lower() for token in sentence]

  tokens.insert(0,source.init_token)
  tokens.append(source.eos_token)

  text_to_indices = [source.vocab.stoi[token] for token in tokens]
  sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)

  with torch.no_grad():
    hidden,cell = model.encoder(sentence_tensor)

  outputs = [target.vocab.stoi[target.pad_token]]

  for _ in range(max_lenght):
    previous_word = torch.LongTensor([outputs[-1]]).to(device)

    with torch.no_grad():
      output, hidden,cell = model.decoder(previous_word,hidden, cell)
      best_guess = output.argmax(1).item()
    outputs.append(best_guess)

    if best_guess == target.vocab.stoi[target.eos_token]:
      break
    
  translated_sentence = [target.vocab.itos[idx] for idx in outputs]
  model.train()
  return translated_sentence[1:]

## Function to tell how long an epoch takes
def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs


## **Training**

In [None]:
best_valid_loss = float('inf')
step = 0
for epoch in range(n_epochs):
  start_time = time.time()
  train_loss = train(model, train_iter, optimizer, criterion, clip)
  valid_loss = evaluate(model, validation_iter, criterion)
  end_time = time.time()
  epoch_mins, epoch_sec = epoch_time(start_time,end_time)
  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'BasicSeq2Se.pt')
  print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_sec}s ')
  print(f'\t Train loss: {train_loss: .3f} | Train PPL: {math.exp(train_loss):7.3f}')
  print(f'\t Val loss: {valid_loss: .3f} | Val. PPL: {math.exp(valid_loss):7.3f}')

### Load the model

In [None]:
model.load_state_dict(torch.load('BasicSeq2Se.pt'))
test_loss = evaluate(model,test_iter,criterion)
print(f'\t Test loss: {valid_loss: .3f} | Test. PPL: {math.exp(valid_loss):7.3f}')

### Translate a sentence

In [None]:
Sentence = 'I will go to school with my friends.'
print(translate_sentence(model,Sentence,Source, Target, device))