# Introduction

1. Neural Machine Translation (NMT) is the task of using artificial neural network models for translation from one language to the other.
2. The NMT model generally consists of an encoder that encodes a source sentence into a fixed-length vector from which a decoder generates a translation.
3. This problem can be thought as a prediction problem, where given a sequence of words in source language as input, task is to predict the output sequence of words in target language.
4. The dataset comes from http://www.manythings.org/anki/, where you may find tab delimited bilingual sentence pairs in different files based on the source and target language of your choice.
5. For this project, you need to use French - English language pairs just to evaluate the projects uniformly for all students.

#Step-1: Download and clean the data
1. Download the data as zip file and extract it to corresponding txt file. Read this txt file and prepare the list of pairs of language phrases.
2. Now, we will nedd to clean these pairs. For cleaning the text, some of the operations for cleaning are:


*   Remove the non printable charaters, if any
*   Remove punctuations and non-alphabetic charaters
* Convert to lowercase



In [1]:
import re

def clean_text(text):
    """
    Cleans the input text by : 
    - Removing non-printable characters
    - Removing punctuations and non-alphabetic characters
    - Converting to lowercase
    """
    text = re.sub(r'[^\x20-\x7E]', '', text) # Remove non-printable characters
    text = re.sub(r'[^a-zA-Z\s]', '', text)  # Remove punctuations and non-alphabetic characters
    return text.lower()  # Convert to lowercase

def extract_and_clean_pairs(file_path):
    """
    Reads a text file and extracts pairs of language phrases, then cleans them.
    Args : file_path (str) : Path to the text file.
    Returns : list : A list of cleaned language phrase pairs.
    """
    pairs = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            # Split the line into parts (assuming tab-separated fields)
            parts = line.split('\t')
            if len(parts) >= 2: # Ensure there are at least two fields
                source, target = parts[:2] # Extract the first two fields as a pair
                # Clean each part
                source_cleaned = clean_text(source)
                target_cleaned = clean_text(target)
                pairs.append((source_cleaned, target_cleaned))
    return pairs 

file_path = 'fra.txt'
cleaned_pairs = extract_and_clean_pairs(file_path)

# Display a sample of the cleaned pairs
for pair in cleaned_pairs[:10]:
    print(pair)


('go', 'va ')
('go', 'marche')
('go', 'en route ')
('go', 'bouge ')
('hi', 'salut ')
('hi', 'salut')
('run', 'cours')
('run', 'courez')
('run', 'prenez vos jambes  vos cous ')
('run', 'file ')


#Step-2: Split and prepare the data for training the model
1. After cleaning the data, next you need to split the data in train and test.
2. Then, you need to create separate tokenizer for both source language and target language.
3. After creating the tokenizer, you need to encode and pad the input (source language) and output(target language) sequences w.r.t. their individual tokenizers and maximum sequence lengths.
4. Here, in this problem you will essentially be predicting the words in target language, therefore output seuences will need to be converted in one hot encoding.


In [2]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import torch
from torch.utils.data import DataLoader, Dataset

class LanguageDataset(Dataset):
    """
    Custom PyTorch Dataset for language pairs
    Args : 
    - inputs (array) : Input sequences
    - outputs (array) : Output sequences
    """
    def __init__ (self, inputs, outputs):
        self.inputs = torch.tensor(inputs, dtype = torch.long)
        self.outputs = torch.tensor(outputs, dtype = torch.long)

    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        return self.inputs[idx], self.outputs[idx]
    
def prepare_data(file_path, test_size = 0.2, max_source_len=10, max_target_len=10):
    # Load and clean data
    pairs = extract_and_clean_pairs(file_path)
    # Split into training and testing sets
    source_texts, target_texts = zip(*pairs)
    X_train, X_test, y_train, y_test = train_test_split(source_texts, target_texts, test_size=test_size, random_state=42)

    # Tokenize source and target texts
    source_tokenizer = Tokenizer()
    target_tokenizer = Tokenizer()
    source_tokenizer.fit_on_texts(X_train)
    target_tokenizer.fit_on_texts(y_train)

    # Encode and pad sequences
    source_train_seq = source_tokenizer.texts_to_sequences(X_train)
    source_test_seq = source_tokenizer.texts_to_sequences(X_test)
    target_train_seq = target_tokenizer.texts_to_sequences(y_train)
    target_test_seq = target_tokenizer.texts_to_sequences(y_test)

    source_train_pad = pad_sequences(source_train_seq, maxlen=max_source_len, padding = 'post')
    source_test_pad = pad_sequences(source_test_seq, maxlen=max_source_len, padding = 'post')
    target_train_pad = pad_sequences(target_train_seq, maxlen=max_source_len, padding = 'post')
    target_test_pad = pad_sequences(target_test_seq, maxlen=max_source_len, padding = 'post')

    # integer indices for targets
    train_dataset = LanguageDataset(source_train_pad, target_train_pad)
    test_dataset = LanguageDataset(source_test_pad, target_test_pad)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    return (
        train_loader, test_loader,
        source_tokenizer, target_tokenizer, 
        max_source_len, max_target_len,
        len(target_tokenizer.word_index) + 1  # Vocabulary size
    )


file_path = 'fra.txt'

# Prepare data
train_loader, test_loader, source_tokenizer, target_tokenizer, max_source_len, max_target_len, target_vocab_size = prepare_data(
    file_path
)

# Example usage: iterate over the dataloader
for batch in train_loader:
    source_batch, target_batch = batch
    print("Source batch shape:", source_batch.shape)
    print("Target batch shape:", target_batch.shape)
    break



Source batch shape: torch.Size([32, 10])
Target batch shape: torch.Size([32, 10])


#Step-3: Define and train the RNN based Encoder-Decoder model
1. First, you need to define the sequential model consisting mainly of two parts Encoder and Decoder 
2. In Encoder, the input sequence shall be passed through an Embedding layer (to train the word embeddings for source language) and then the output from the Embedding layer may be passed through one or more RNN/LSTM layers.
3. Now, to connect this Encoder to Decoder (yet to be defined), we can use RepeatVector layer. (This is because the shape of the output by Encoder is not same as expected shape of Input by Decoder)
4. Now, stack up the Decoder, wherein you may add one or more RNN/LSTM layers and finally the output TimeDistributed Dense layer to get output separately by timesteps.
5. Now, you have defined the model and now this can be trained on the training data, you prepared in last step. Here, you may play with the number of epochs, optimizer, batch size to get the optimum results.

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers=1, dropout=0.2):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

    def forward(self, src):
        embedded = self.embedding(src) # [batch_size, src_len, emb_dim]
        outputs, (hidden, cell) = self.rnn(embedded) # outputs ignored; only need hidden, cell
        return hidden, cell
    
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers=1, dropout=0.2):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)  # Final Dense layer for timestep-specific outputs
        self.dropout = nn.Dropout(dropout)

    def forward(self, trg, hidden, cell):
        trg = trg.unsqueeze(1)  # Add a time dimension: [batch_size] -> [batch_size, 1]
        embedded = self.dropout(self.embedding(trg))  # [batch_size, 1, emb_dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))  # [batch_size, 1, hidden_dim]
        prediction = self.fc(output.squeeze(1))  # [batch_size, output_dim]
        return prediction, hidden, cell
    
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        hidden, cell = self.encoder(src)

        # First input to the decoder is the <sos> token
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output

            # Decide if we will use teacher forcing or not
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            input = trg[:, t] if teacher_force else output.argmax(1)

        return outputs

In [4]:
# Initialize the model
INPUT_DIM = len(source_tokenizer.word_index) + 1 # Source vocabulary size
OUTPUT_DIM = target_vocab_size  # Target vocabulary size
ENC_EMB_DIM = 128  # Encoder embedding size
DEC_EMB_DIM = 128  # Decoder embedding size
HIDDEN_DIM = 256  # LSTM hidden size
N_LAYERS = 1
DROPOUT = 0.2
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device being used: {DEVICE}")


encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT)

model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding index

Device being used: cuda




In [5]:
# Training function
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for src, trg in iterator:
        src, trg = src.to(DEVICE).long(), trg.to(DEVICE).long()

        optimizer.zero_grad()
        output = model(src, trg)  # [batch_size, trg_len, trg_vocab_size]

        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)  # Exclude <sos> token
        trg = trg[:, 1:].reshape(-1)  # Exclude <sos> token

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Training loop
N_EPOCHS = 10
CLIP = 1

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    print(f"Epoch: {epoch+1}, Train Loss: {train_loss:.4f}")

Epoch: 1, Train Loss: 4.9880
Epoch: 2, Train Loss: 3.2446
Epoch: 3, Train Loss: 2.5833
Epoch: 4, Train Loss: 2.2002
Epoch: 5, Train Loss: 1.9389
Epoch: 6, Train Loss: 1.7519
Epoch: 7, Train Loss: 1.6119
Epoch: 8, Train Loss: 1.5031
Epoch: 9, Train Loss: 1.4110
Epoch: 10, Train Loss: 1.3389


#Step-4: Evaluating the model
Use BLEU score for evaluating your model using NLTK library

In [6]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def evaluate_model(model, data_loader, source_tokenizer, target_tokenizer, device):
    """
    Evaluates the model using BLEU score on the test dataset.

    Args:
        model: Trained Seq2Seq model.
        data_loader: DataLoader for the test set.
        source_tokenizer: Tokenizer for source language.
        target_tokenizer: Tokenizer for target language.
        device: Device (CPU or GPU).

    Returns:
        float: Average BLEU score for the test set.
    """
    model.eval()
    bleu_scores = []
    sos_token = 1  # Assuming <sos> is tokenized as 1
    eos_token = 2  # Assuming <eos> is tokenized as 2

    with torch.no_grad():
        for src, trg in data_loader:
            src = src.to(device)

            # Get the model predictions
            batch_size = src.shape[0]
            trg_len = trg.shape[1]
            outputs = torch.zeros(batch_size, trg_len).to(device, dtype=torch.long)

            hidden, cell = model.encoder(src)
            input_token = torch.full((batch_size,), sos_token, dtype=torch.long).to(device)

            for t in range(1, trg_len):
                output, hidden, cell = model.decoder(input_token, hidden, cell)
                top1 = output.argmax(1)
                outputs[:, t] = top1
                input_token = top1

            # Convert outputs to text
            for i in range(batch_size):
                predicted_seq = outputs[i].tolist()
                target_seq = trg[i].tolist()

                # Remove padding and special tokens
                predicted_text = [
                    target_tokenizer.index_word[token]
                    for token in predicted_seq
                    if token not in {0, sos_token, eos_token}
                ]
                target_text = [
                    target_tokenizer.index_word[token]
                    for token in target_seq
                    if token not in {0, sos_token, eos_token}
                ]

                # Calculate BLEU score for the sequence
                if target_text:  # Ensure target is not empty
                    bleu_score = sentence_bleu(
                        [target_text],
                        predicted_text,
                        smoothing_function=SmoothingFunction().method1,
                    )
                    bleu_scores.append(bleu_score)

    return sum(bleu_scores) / len(bleu_scores) if bleu_scores else 0.0


# Evaluate the model
test_bleu_score = evaluate_model(
    model, test_loader, source_tokenizer, target_tokenizer, DEVICE
)
print(f"Average BLEU Score on Test Set: {test_bleu_score:.4f}")


Average BLEU Score on Test Set: 0.0970
