<a href="https://colab.research.google.com/github/Jaseelkt007/ML/blob/master/Seq2Seq_translation_with_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sequence to Sequence Machine Translation using Attention Mechanism
#### Based on the paper Neural Machine Translation by Jointly Learning to align and Translate - 2016
####  This paper introduces the attention mechanism which alows model to dynamicaly focus on different parts of the source sentence at each time step, rather than relying on single fixed size context vector 2014 paper.
#### the model learn to align parts of the input sentence to corresponding parts of the output sentence.
#### Training: Attention weights are calculated from hidden states from decoder after every prediction with the encoder states at each time steps in prediction side (these scores are calculated using Forward neural network), then these weights are element wise multiplied and summed with corresponding encoder states to form the context vector. these convext vector is concatinated with target input vector of decoder to decoder LSTM to predict next word and this process continues till end of word

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Decompress the files
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/train.de.gz
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/train.en.gz
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/val.de.gz
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/val.en.gz
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/test_2016_flickr.de.gz
!gunzip /content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/test_2016_flickr.en.gz

In [3]:
# Define the paths to the dataset files inside the multi30k-dataset folder in your Drive
train_de_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/train.de'
train_en_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/train.en'

val_de_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/val.de'
val_en_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/val.en'

test_de_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/test_2016_flickr.de'
test_en_path = '/content/drive/MyDrive/multi30k_data/multi30k-dataset/data/task1/raw/test_2016_flickr.en'

# Function to load data from a file
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.readlines()

# Load the training, validation, and test datasets
train_ger = load_data(train_de_path)
train_eng = load_data(train_en_path)

val_ger = load_data(val_de_path)
val_eng = load_data(val_en_path)

test_ger = load_data(test_de_path)
test_eng = load_data(test_en_path)

In [None]:
!pip install torch torchtext spacy
!python -m spacy download de_core_news_sm
!python -m spacy download en_core_web_sm

In [5]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Ensure you have downloaded the required NLTK resources
nltk.download('punkt')

def bleu(data, model, german_vocab, english_vocab, device, max_length=50):
    targets = []
    outputs = []

    # data should be a list of (source_sentence, target_sentence) pairs
    for src, trg in data:
        # src: German sentence (input)
        # trg: English sentence (target)

        # Translate the German source sentence to English using the model
        prediction = translate_sentence(model, src, german_vocab, english_vocab, device, max_length=max_length)

        # Remove <eos> token from the predicted sentence
        prediction = prediction[:-1] if prediction[-1] == '<eos>' else prediction

        # Tokenize the target sentence if it's not already tokenized
        if isinstance(trg, str):
            trg = nltk.word_tokenize(trg.lower())

        # Add to outputs and targets
        outputs.append(prediction)  # Predicted output
        targets.append([trg])  # Target needs to be in a nested list for BLEU score

    # Calculate BLEU score using NLTK
    smooth = SmoothingFunction().method4  # Helps to handle short sentences and BLEU-0 cases
    bleu_scores = [
        sentence_bleu(target, output, smoothing_function=smooth) for target, output in zip(targets, outputs)
    ]

    # Return the average BLEU score
    return sum(bleu_scores) / len(bleu_scores)


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
import spacy
import random
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
from torch.utils.tensorboard import SummaryWriter
import torch.optim as optim
import os
%load_ext tensorboard

# Load spacy tokenizers for German and English
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

def tokenize_ger(text):
    return [tok.text.lower() for tok in spacy_de.tokenizer(text)]

def tokenize_eng(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text)]

# Special tokens
INIT_TOKEN = '<sos>'
EOS_TOKEN = '<eos>'
PAD_TOKEN = '<pad>'
UNK_TOKEN = '<unk>'

def translate_sentence(model, sentence, german_vocab, english_vocab, device, max_length=50): # for inference
    # Tokenize the input sentence = german
    if isinstance(sentence, str):
        tokens = [token.lower() for token in tokenize_ger(sentence)]
    else:
        tokens = [ token.lower() for token in sentence] # if the sentence is already is tokenized

    # add <sos> and <eos>
    tokens.insert(0,'<sos>')
    tokens.append('<eos>')

    # Convert the tokens in to corresponding indices from vocab dictionary
    text_to_token = [german_vocab.get(token, german_vocab['<unk>']) for token in tokens]

    # Convert to tensor and add batch dimension
    sentence_tensor = torch.LongTensor(text_to_token).unsqueeze(1).to(device)

    # Get the hidden and cell state from encoder
    with torch.no_grad():
        encoder_states ,hidden , cell = model.encoder(sentence_tensor)

    # Initiliaze the outputs for decoder with <sos>
    outputs = [english_vocab['<sos>']]


    # Get the Decoder output for prediction for all the input words or max length
    for _ in range(max_length):
        previous_words = torch.LongTensor([outputs[-1]]).to(device)
        with torch.no_grad():
            output, hidden , cell = model.decoder(previous_words , encoder_states , hidden , cell)
        best_guess = output.argmax(1).item()
        outputs.append(best_guess)

        if best_guess == english_vocab['<eos>']:
            break

    # Covert the output indices back to words
    translated_sentence = [english_itos[idx] for idx in outputs]
    return translated_sentence[1:] # except <sos>

def collate_fn(batch):
    # unzip the batch into seperate the source and target sequence
    source_batch, target_batch = zip(*batch)

    # Convert lists of sequence into padded tensors
    source_padded = pad_sequence(source_batch, padding_value=pad_idx , batch_first=False)
    target_padded = pad_sequence(target_batch, padding_value=pad_idx , batch_first=False)
    return source_padded, target_padded

# Build vocabulary from tokenized sentences
def build_vocab(sentences, tokenizer, min_freq=2 , max_size = 10000):
    counter = Counter()
    # Tokenize and count the frequency of tokens
    for sentence in sentences:
        tokens = tokenizer(sentence)
        counter.update(tokens)

    sorted_tokens = sorted(counter.items() , key=lambda x: (-x[1], x[0]))

    # Build a vocab from words appearing more than min_freq times
    vocab = {word: i+4 for i, (word, count) in enumerate(sorted_tokens[:max_size]) if count >= min_freq}
    # special tokens
    vocab[INIT_TOKEN] = 0
    vocab[EOS_TOKEN] = 1
    vocab[PAD_TOKEN] = 2
    vocab[UNK_TOKEN] = 3
    return vocab

# Build vocab for both source and target sentences
german_vocab = build_vocab(train_ger, tokenize_ger)
english_vocab = build_vocab(train_eng, tokenize_eng)

# Reverse vocab (index to string)
german_itos = {idx : word for word , idx in german_vocab.items()}
english_itos = {idx : word for word , idx in english_vocab.items()}


class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, src_tokenizer, tgt_tokenizer):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src_tokens = [INIT_TOKEN] + self.src_tokenizer(self.src_sentences[idx]) + [EOS_TOKEN]
        tgt_tokens = [INIT_TOKEN] + self.tgt_tokenizer(self.tgt_sentences[idx]) + [EOS_TOKEN]

        src_indices = [self.src_vocab.get(token, self.src_vocab[UNK_TOKEN]) for token in src_tokens]
        tgt_indices = [self.tgt_vocab.get(token, self.tgt_vocab[UNK_TOKEN]) for token in tgt_tokens]

        return torch.tensor(src_indices), torch.tensor(tgt_indices)


class Encoder(nn.Module):
  def __init__(self,input_size, hidden_size, embedding_size, num_layers , p ) -> None:
      super(Encoder,self).__init__()
      self.hidden_size = hidden_size
      self.num_layers = num_layers
      self.embedding = nn.Embedding(input_size,embedding_size)
      self.dropout = nn.Dropout(p)
      self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, bidirectional=True , dropout=p)
      self.fc_hidden = nn.Linear(hidden_size*2, hidden_size)
      self.fc_cell = nn.Linear(hidden_size*2, hidden_size)

  def forward(self, x):
      # x shape : (seq_length, N)

      embedding = self.dropout(self.embedding(x))
      # embedding shape : (seq_length, N , embedding_size)

      encoder_states , (hidden , cell) = self.rnn(embedding)

      # final hidden state shape in BiLSTM : (2 * num_layers , N , hidden_size)
      hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2)) # concatinated forward and backward,along hidden size axis , because decoder expect (1,N,1024) = num_layers, batch,hidden_size

      cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))

      return encoder_states , hidden, cell # shape of encoder states : (sequence_length, N , hidden_size * 2), it contain hidden states of all tokens

class Decoder(nn.Module):
  def __init__(self,input_size, embedding_size, hidden_size, output_size, num_layers, p ) -> None:
      super(Decoder, self).__init__()
      self.hidden_size = hidden_size
      self.num_layers = num_layers

      self.dropout = nn.Dropout(p)
      self.embedding = nn.Embedding(input_size, embedding_size) # (1 , N , embedding_size)
      self.rnn = nn.LSTM(hidden_size*2 + embedding_size, hidden_size ,num_layers) # the input size_encoder = embedding size  + context vector

      self.energy = nn.Linear(hidden_size*3,1)
      self.softmax = nn.Softmax(dim=0)
      self.relu = nn.ReLU()

      self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x ,encoder_states, hidden, cell):
      # shape of x : (N) but we want (1,N)
      x = x.unsqueeze(0)

      embedding = self.dropout(self.embedding(x))
      # embedding shape : (1 , N, embedding_size)

      sequence_length = encoder_states.shape[0] # encoder_shape -> (sequence_length, N , hidden_size * 2)
      h_reshaped = hidden.repeat(sequence_length, 1, 1) # hidden state decoder - (1 , N, hidden_size) ---> (sequence_length, N, hidden_size)
      energy = self.relu(self.energy(torch.cat((h_reshaped, encoder_states), dim=2))) # resulting concatinated shape shape => (sequence_lengeth, N, hidden_size * 3)
      attention = self.softmax(energy) # (sequence_length, N ,1 )

      attention = attention.permute(1, 2, 0) # (N, 1, sequence_length)
      encoder_states = encoder_states.permute(1,0,2) # (N, sequence_length, hidden_size *2) # many operation in pytorch expects Batch in 0th dimension
      context_vector = torch.bmm(attention , encoder_states).permute(1,0,2) # (N, 1, hidden_size *2) --> (1, N, hidden_size*2)
      rnn_input = torch.cat((context_vector, embedding), dim= 2) # (1, N , hidden_size *2 + embedding_size)



      output , (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
      # output shape : (1, N , hidden_size)

      predictions = self.fc(output)
      # shape of predictions : (1 , N , length_of_vocab)

      predictions = predictions.squeeze(0) # because we want to add the prediction at last for the final answer

      return predictions, hidden, cell

class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder) -> None:
      super(Seq2Seq,self).__init__()
      self.encoder = encoder
      self.decoder = decoder

  def forward(self, source, target, teacher_force_ratio=0.5):
      batch_size = source.shape[1]
      target_len = target.shape[0]
      target_vocab_size = len(english_vocab)

      outputs = torch.zeros(target_len, batch_size, target_vocab_size) # length of target sequence = no of words in that sentence , number of sequences, size of target vocab

      encoder_states , hidden , cell = self.encoder(source)

      x = target[0] # first input to the decoder , is

      for t in range (1, target_len):
          output , hidden, cell = self.decoder(x , encoder_states ,hidden, cell) # output --> (batch_size, target_vocab_size)

          outputs[t] = output

          best_guess = output.argmax(1)
          ''' Teacher forcing is used so that model needs to trained with actual target word and predicted. In paper, teacher forcing ratio depits which one is used
              its better to use ratio smaller in the beginnig so that during initial part of training the model learns with the actual target inputs ,
              because the predicted words in the beginning may not good for predicting other word, but in this code, we just used random ratio for simplicity,
              so the input to decoder can be either target or predicted word from the decoder
          '''
          x = target[t] if random.random() < teacher_force_ratio else best_guess # random.random() generates random no btw 0 and 1

      return outputs

pad_idx = english_vocab['<pad>'] # for padding to match the lenght of longest sequence in the batch
# Hyperparameters
num_layers= 1
learning_rate = 0.001
batch_size = 64
num_epochs = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu" )
input_size_encoder = len(german_vocab)
input_size_decoder = len(english_vocab)
output_size = len(english_vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
enc_dropout = 0.5
dec_dropout = 0.5

writer = SummaryWriter(f'runs/loss_plot')
step=0

# Create the dataset
train_dataset = TranslationDataset(train_ger, train_eng, german_vocab, english_vocab, tokenize_ger, tokenize_eng)
# DataLoader
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

encoder_net = Encoder(input_size_encoder, hidden_size, encoder_embedding_size, num_layers, enc_dropout).to(device)
decoder_net = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers, dec_dropout).to(device)
model = Seq2Seq(encoder_net, decoder_net)

optimizer = optim.Adam(model.parameters(), lr = learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)


# Training
# Define checkpoint path to save the checkpoint(each epoch)
checkpoint_path = '/content/drive/MyDrive/multi30k_data/model_checkpoint.pth'

sentence = "ein boot mit mehreren männern darauf wird von einem großen pferdegespann ans ufer gezogen." # for inference

for epoch in range(num_epochs):
    checkpoint = {'state_dict':model.state_dict(), 'optimizer':optimizer.state_dict()}
    torch.save(checkpoint ,checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")

    model.eval()
    translated_sentence = translate_sentence(model, sentence, german_vocab, english_vocab, device, max_length=50)
    print(f"the Translated example sentences \n {translated_sentence} ")

    model.train()

    for batch_idx, (inp_data, target) in enumerate(train_loader):
      inp_data = inp_data.to(device)
      target = target.to(device)

      output = model(inp_data, target) # (target_len, batch_size, output_dim)

      # Reshape for loss function , loss function (N,C) --> N : target-1 * batch size
      output = output[1:].reshape(-1, output.shape[2])
      target = target[1:].reshape(-1) # target_len -1 * batch_size (N)

      optimizer.zero_grad()
      loss = criterion(output.to(device), target.to(device))
      loss.backward()

      # to avoid exploding grading
      torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
      optimizer.step()

      writer.add_scalar('Training Loss ', loss.item() , global_step = batch_idx)


%tensorboard --logdir=runs

In [None]:
# Assuming `test_data` is a list of (src_sentence, trg_sentence) pairs

# Compute the BLEU score for the first 100 sentences in the test data
score = bleu(test_data[1:100], model, german_vocab, english_vocab, device)

# Print the BLEU score as a percentage
print(f"Bleu score: {score * 100:.2f}")