All the imports required.

In [None]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import os

import torch
import torch.utils.data
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import argparse

Since the data is in Unicode format, we first convert it to ASCII and remove all non-letter characters to standardize the text. We then simplify our dataset by filtering out sentence pairs where the English sentence exceeds ten tokens. These filtered pairs are saved to a new file named processed_eng-fra.txt.

To further simplify the dataset, we consider the sentence pairs where the English sentence starts with a specific set of prefixes. Finally, we generate the vocabularies of all English and French words present in the filtered dataset.

In [None]:
def unicodeToAscii(s):
    """Convert Unicode string to ASCII"""
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub("[.!?]", '', s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def process_file(input_file, output_file, max_tokens=10):
    """Process the entire file and write to output"""
    with open(input_file, 'r', encoding='utf-8') as f:
        lines = f.read().strip().split('\n')

    eng_prefixes = (
        "i am ", "i m ",
        "he is", "he s ",
        "she is", "she s",
        "you are", "you re ",
        "we are", "we re ",
        "they are", "they re "
    )

    processed_lines = []
    filtered_count = 0
    total_count = 0
    prefix_filtered = 0

    for line in lines:
        total_count += 1
        parts = line.split('\t')
        processed_parts = [normalizeString(part) for part in parts]

        eng_part = processed_parts[0]
        eng_tokens = eng_part.split()

        has_prefix = any(eng_part.startswith(prefix) for prefix in eng_prefixes)

        if len(eng_tokens) <= max_tokens and has_prefix:
            processed_lines.append('\t'.join(processed_parts))
        else:
            filtered_count += 1
            if not has_prefix:
                prefix_filtered += 1

    with open(output_file, 'w', encoding='utf-8') as f:
        f.write('\n'.join(processed_lines))

    print(f"Processed {total_count} lines")
    print(f"Filtered out {filtered_count} lines")
    print(f"  - {prefix_filtered} without required prefix")
    print(f"  - {filtered_count - prefix_filtered} with more than {max_tokens} tokens")
    print(f"Remaining: {len(processed_lines)} lines")
    print(f"First 5 processed examples:")
    for i in range(min(5, len(processed_lines))):
        print(processed_lines[i])

PAD_token = 0
SOS_token = 1
EOS_token = 2

def build_vocab_from_data(pairs, max_vocab_size=None):
    """
    Build vocabularies for source and target languages

    Args:
        pairs: List of sentence pairs [English, French]
        max_vocab_size: Optional limit on vocabulary size

    Returns:
        source_vocab: Dictionary mapping English words to indices
        source_index2word: Dictionary mapping indices to English words
        target_vocab: Dictionary mapping French words to indices
        target_index2word: Dictionary mapping indices to French words
    """
    source_vocab = {"<pad>": PAD_token, "<sos>": SOS_token, "<eos>": EOS_token}
    target_vocab = {"<pad>": PAD_token, "<sos>": SOS_token, "<eos>": EOS_token}

    source_index2word = {PAD_token: "<pad>", SOS_token: "<sos>", EOS_token: "<eos>"}
    target_index2word = {PAD_token: "<pad>", SOS_token: "<sos>", EOS_token: "<eos>"}

    source_word_count = {}
    target_word_count = {}

    for pair in pairs:
        for word in pair[0].split():
            if word not in source_word_count:
                source_word_count[word] = 1
            else:
                source_word_count[word] += 1

        for word in pair[1].split():
            if word not in target_word_count:
                target_word_count[word] = 1
            else:
                target_word_count[word] += 1

    source_index = 3
    target_index = 3

    if max_vocab_size:
        source_sorted = sorted(source_word_count.items(), key=lambda x: x[1], reverse=True)
        target_sorted = sorted(target_word_count.items(), key=lambda x: x[1], reverse=True)

        for word, _ in source_sorted[:max_vocab_size - 3]:
            source_vocab[word] = source_index
            source_index2word[source_index] = word
            source_index += 1

        for word, _ in target_sorted[:max_vocab_size - 3]:
            target_vocab[word] = target_index
            target_index2word[target_index] = word
            target_index += 1
    else:
        for word in source_word_count:
            source_vocab[word] = source_index
            source_index2word[source_index] = word
            source_index += 1

        for word in target_word_count:
            target_vocab[word] = target_index
            target_index2word[target_index] = word
            target_index += 1

    return source_vocab, source_index2word, target_vocab, target_index2word, source_index, target_index

def load_sentence_pairs(file_path):
    """Load sentence pairs from the processed file"""
    pairs = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) == 2:
                pairs.append(parts)
    return pairs

pairs = load_sentence_pairs('processed_eng_fra.txt')
eng_vocab, eng_index2word, fra_vocab, fra_index2word, eng_vocab_size, fra_vocab_size = build_vocab_from_data(pairs)

print(f"English vocabulary size: {eng_vocab_size}")
print(f"French vocabulary size: {fra_vocab_size}")
print(f"Sample English words: {list(eng_vocab.items())[:10]}")
print(f"Sample French words: {list(fra_vocab.items())[:10]}")

def sentence_to_indices(sentence, vocab, max_length, add_sos_eos=True):
    """Convert a sentence to a list of indices"""
    words = sentence.split()
    indices = []

    if add_sos_eos:
        indices.append(SOS_token)

    for word in words:
        if word in vocab:
            indices.append(vocab[word])
        else:
            indices.append(PAD_token)

    if add_sos_eos:
        indices.append(EOS_token)

    if len(indices) > max_length:
        indices = indices[:max_length]
    else:
        indices += [PAD_token] * (max_length - len(indices))

    return indices

def indices_to_sentence(indices, index2word):
    """Convert a list of indices to a sentence"""
    words = []
    for idx in indices:
        if idx == EOS_token:
            break
        elif idx != SOS_token and idx != PAD_token:
            words.append(index2word[idx])
    return ' '.join(words)

process_file('eng-fra.txt', 'processed_eng_fra.txt', max_tokens=10)

English vocabulary size: 3381
French vocabulary size: 5129
Sample English words: [('<pad>', 0), ('<sos>', 1), ('<eos>', 2), ('i', 3), ('m', 4), ('ok', 5), ('fat', 6), ('fit', 7), ('hit', 8), ('ill', 9)]
Sample French words: [('<pad>', 0), ('<sos>', 1), ('<eos>', 2), ('j', 3), ('ai', 4), ('ans', 5), ('je', 6), ('vais', 7), ('bien', 8), ('ca', 9)]
Processed 135842 lines
Filtered out 122933 lines
  - 122377 without required prefix
  - 556 with more than 10 tokens
Remaining: 12909 lines
First 5 processed examples:
i m 	j ai ans 
i m ok 	je vais bien 
i m ok 	ca va 
i m fat 	je suis gras 
i m fat 	je suis gros 


This is the main module, where we define the encoder and decoder and the linear connector bridge between them. The encoder used here is a bidirectional LSTM and the decoder is an LSTM.

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.1):
        """Bidirectional LSTM encoder"""
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers,
                            batch_first=True, bidirectional=True, dropout=dropout if num_layers > 1 else 0)
        self._init_weights()

    def _init_weights(self):
        """Initialize weights with smaller values to prevent gradient explosion"""
        for name, param in self.named_parameters():
            if 'weight_ih' in name:
                nn.init.xavier_uniform_(param.data, gain=0.5)
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param.data, gain=0.5)
            elif 'bias' in name:
                nn.init.constant_(param.data, 0)
            elif 'embedding.weight' in name:
                nn.init.uniform_(param.data, -0.1, 0.1)

    def forward(self, input_seq, hidden=None):
        """
        input_seq: batch_size x seq_len
        """
        embedded = self.dropout(self.embedding(input_seq))
        outputs, (hidden, cell) = self.lstm(embedded, hidden)
        return outputs, (hidden, cell)

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, num_layers=1, dropout=0.1):
        """LSTM decoder"""
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers,
                            batch_first=True, dropout=dropout if num_layers > 1 else 0)
        self.out = nn.Linear(hidden_size, output_size)
        self._init_weights()

    def _init_weights(self):
        """Initialize weights with smaller values to prevent gradient explosion"""
        for name, param in self.named_parameters():
            if 'weight_ih' in name:
                nn.init.xavier_uniform_(param.data, gain=0.5)
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param.data, gain=0.5)
            elif 'bias' in name:
                nn.init.constant_(param.data, 0)
            elif 'embedding.weight' in name:
                nn.init.uniform_(param.data, -0.1, 0.1)
            elif 'out.weight' in name:
                nn.init.xavier_uniform_(param.data, gain=0.5)

    def forward(self, input_token, hidden):
        """
        input_token: batch_size tensor of indices
        hidden: tuple of (hidden_state, cell_state)
        """
        if input_token.dim() > 1:
            input_token = input_token.squeeze(-1)
        embedded = self.embedding(input_token).unsqueeze(1)
        embedded = self.dropout(embedded)
        lstm_out, hidden = self.lstm(embedded, hidden)
        output = self.out(lstm_out.squeeze(1))
        return output, hidden

class EncoderDecoderConnector(nn.Module):
    def __init__(self, encoder_hidden_size, decoder_hidden_size, num_layers=1):
        """Linear layer to connect bidirectional encoder to unidirectional decoder"""
        super(EncoderDecoderConnector, self).__init__()
        self.hidden_connector = nn.Linear(encoder_hidden_size * 2, decoder_hidden_size)
        self.cell_connector = nn.Linear(encoder_hidden_size * 2, decoder_hidden_size)
        self.num_layers = num_layers
        self._init_weights()

    def _init_weights(self):
        """Initialize weights with smaller values to prevent gradient explosion"""
        nn.init.xavier_uniform_(self.hidden_connector.weight, gain=0.5)
        nn.init.constant_(self.hidden_connector.bias, 0)
        nn.init.xavier_uniform_(self.cell_connector.weight, gain=0.5)
        nn.init.constant_(self.cell_connector.bias, 0)

    def forward(self, encoder_hidden, encoder_cell):
        """
        Reshape and transform encoder final states for decoder initial states
        encoder_hidden: 2*num_layers x batch_size x hidden_size
        encoder_cell: 2*num_layers x batch_size x hidden_size
        """
        decoder_hidden = []
        decoder_cell = []
        for l in range(self.num_layers):
            idx_forward = l * 2
            idx_backward = l * 2 + 1
            hidden_concat = torch.cat([encoder_hidden[idx_forward], encoder_hidden[idx_backward]], dim=1)
            cell_concat = torch.cat([encoder_cell[idx_forward], encoder_cell[idx_backward]], dim=1)
            decoder_hidden.append(torch.tanh(self.hidden_connector(hidden_concat)).unsqueeze(0))
            decoder_cell.append(torch.tanh(self.cell_connector(cell_concat)).unsqueeze(0))
        decoder_hidden = torch.cat(decoder_hidden, dim=0)
        decoder_cell = torch.cat(decoder_cell, dim=0)
        return decoder_hidden, decoder_cell

We use teacher forcing to train the model and also we use the gradient clippling provided by pytorch.

In [None]:
def create_dataloader(pairs, src_vocab, tgt_vocab, batch_size, max_input_length, max_output_length):
    """Create batched DataLoader from sentence pairs"""
    n_pairs = len(pairs)
    input_ids = torch.zeros(n_pairs, max_input_length, dtype=torch.long)
    target_ids = torch.zeros(n_pairs, max_output_length, dtype=torch.long)
    for idx, (src, tgt) in enumerate(pairs):
        src_indices = sentence_to_indices(src, src_vocab, max_input_length)
        tgt_indices = sentence_to_indices(tgt, tgt_vocab, max_output_length)
        input_ids[idx] = torch.tensor(src_indices, dtype=torch.long)
        target_ids[idx] = torch.tensor(tgt_indices, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(input_ids, target_ids)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return dataloader

def train_model(encoder, decoder, connector,
                src_vocab, tgt_vocab,
                pairs,
                n_epochs=80,
                batch_size=32,
                max_input_length=10,
                max_output_length=15,
                learning_rate=0.0005,
                teacher_forcing_ratio=0.8,
                clip=0.5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    encoder.to(device)
    decoder.to(device)
    connector.to(device)
    train_dataloader = create_dataloader(pairs, src_vocab, tgt_vocab, batch_size, max_input_length, max_output_length)

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate, weight_decay=1e-6, eps=1e-8)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate, weight_decay=1e-6, eps=1e-8)
    connector_optimizer = optim.Adam(connector.parameters(), lr=learning_rate, weight_decay=1e-6, eps=1e-8)

    criterion = nn.CrossEntropyLoss(ignore_index=PAD_token, reduction='mean', label_smoothing=0.1)
    all_losses = []

    for epoch in range(1, n_epochs + 1):
        encoder.train()
        decoder.train()
        connector.train()
        epoch_loss = 0
        batch_count = 0

        for input_tensor, target_tensor in train_dataloader:
            input_tensor = input_tensor.to(device)
            target_tensor = target_tensor.to(device)

            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()
            connector_optimizer.zero_grad()

            batch_size = input_tensor.size(0)
            target_length = target_tensor.size(1)

            encoder_outputs, (encoder_hidden, encoder_cell) = encoder(input_tensor)
            decoder_hidden, decoder_cell = connector(encoder_hidden, encoder_cell)
            decoder_input = torch.tensor([SOS_token] * batch_size, device=device)
            all_losses_batch = []
            use_teacher_forcing = random.random() < teacher_forcing_ratio

            if use_teacher_forcing:
                for t in range(min(target_length - 1, max_output_length)):
                    decoder_output, (decoder_hidden, decoder_cell) = decoder(
                        decoder_input, (decoder_hidden, decoder_cell)
                    )
                    if t < target_length - 1:
                        step_loss = criterion(decoder_output, target_tensor[:, t])
                        if not torch.isnan(step_loss):
                            all_losses_batch.append(step_loss)
                    decoder_input = target_tensor[:, t]
            else:
                for t in range(max_output_length):
                    decoder_output, (decoder_hidden, decoder_cell) = decoder(
                        decoder_input, (decoder_hidden, decoder_cell)
                    )
                    if t < target_length - 1:
                        step_loss = criterion(decoder_output, target_tensor[:, t])
                        if not torch.isnan(step_loss):
                            all_losses_batch.append(step_loss)
                    _, topi = decoder_output.topk(1)
                    decoder_input = topi.squeeze().detach()
                    if (decoder_input == EOS_token).all():
                        break

            loss = torch.stack(all_losses_batch).mean()
            loss.backward()

            torch.nn.utils.clip_grad_norm_(encoder.parameters(), clip)
            torch.nn.utils.clip_grad_norm_(decoder.parameters(), clip)
            torch.nn.utils.clip_grad_norm_(connector.parameters(), clip)

            encoder_optimizer.step()
            decoder_optimizer.step()
            connector_optimizer.step()

            epoch_loss += loss.item()
            batch_count += 1

        if batch_count > 0:
            avg_epoch_loss = epoch_loss / batch_count
            all_losses.append(avg_epoch_loss)
            print(f"Epoch {epoch}/{n_epochs} | Average Loss: {avg_epoch_loss:.4f}")

    return all_losses

def evaluate(encoder, decoder, connector, sentence, src_vocab, tgt_index2word,
             max_input_length, max_output_length, device):
    """Generate translation for a single sentence"""
    input_indices = sentence_to_indices(sentence, src_vocab, max_input_length)
    input_tensor = torch.tensor(input_indices, dtype=torch.long, device=device).unsqueeze(0)

    encoder.eval()
    decoder.eval()
    connector.eval()

    with torch.no_grad():
        encoder_outputs, (encoder_hidden, encoder_cell) = encoder(input_tensor)
        decoder_hidden, decoder_cell = connector(encoder_hidden, encoder_cell)
        decoder_input = torch.tensor([SOS_token], device=device)
        decoded_words = []

        for di in range(max_output_length):
            decoder_output, (decoder_hidden, decoder_cell) = decoder(
                decoder_input, (decoder_hidden, decoder_cell)
            )
            topv, topi = decoder_output.topk(1)
            token = topi.item()

            if token == EOS_token:
                decoded_words.append('<EOS>')
                break
            elif token != PAD_token:
                decoded_words.append(tgt_index2word[token])

            decoder_input = topi.detach()

    encoder.train()
    decoder.train()
    connector.train()
    return decoded_words

def evaluate_sample(encoder, decoder, connector, src_vocab, tgt_vocab, sentence,
                    max_input_length, max_output_length, device):
    """Evaluate and print a sample translation"""
    tgt_index2word = {idx: word for word, idx in tgt_vocab.items()}
    print("\nSample Translation:")
    print(f"Input: {sentence}")
    output_words = evaluate(encoder, decoder, connector, sentence, src_vocab, tgt_index2word,
                            max_input_length, max_output_length, device)
    output_sentence = ' '.join(output_words)
    if '<EOS>' in output_sentence:
        output_sentence = output_sentence[:output_sentence.index('<EOS>')]
        print("EOS predicted")

    print(f"Output: {output_sentence}")
    print("-" * 50)

In [None]:
from sklearn.model_selection import train_test_split

def main():
    hidden_size = 128
    num_layers = 1
    dropout = 0.2
    batch_size = 16
    n_epochs = 100
    learning_rate = 0.001
    teacher_forcing_ratio = 0.8
    max_input_length = 10
    max_output_length = 15
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print("Loading data...")
    pairs = load_sentence_pairs('processed_eng_fra.txt')

    train_pairs, test_pairs = train_test_split(pairs, test_size=0.2, random_state=42)

    print("Building vocabularies...")
    src_vocab, src_index2word, tgt_vocab, tgt_index2word, src_size, tgt_size = build_vocab_from_data(pairs)

    encoder = EncoderRNN(src_size, hidden_size, num_layers, dropout)
    decoder = DecoderRNN(hidden_size, tgt_size, num_layers, dropout)
    connector = EncoderDecoderConnector(hidden_size, hidden_size, num_layers)

    print("Starting training...")
    train_model(
        encoder, decoder, connector,
        src_vocab, tgt_vocab, train_pairs,
        n_epochs=n_epochs,
        batch_size=batch_size,
        max_input_length=max_input_length,
        max_output_length=max_output_length,
        learning_rate=learning_rate,
        teacher_forcing_ratio=teacher_forcing_ratio
    )

    print("\nEvaluating on 10 random test samples:")
    random.shuffle(test_pairs)
    for i in range(min(10, len(test_pairs))):
        src, tgt = test_pairs[i]
        output_words = evaluate(encoder, decoder, connector, src, src_vocab, tgt_index2word,
                                max_input_length, max_output_length, device)
        output_sentence = ' '.join(output_words).replace('<EOS>', '')
        print(f"\nInput:  {src}")
        print(f"Target: {tgt}")
        print(f"Output: {output_sentence}")
        print("-" * 50)

    torch.save({
        'encoder_state_dict': encoder.state_dict(),
        'decoder_state_dict': decoder.state_dict(),
        'connector_state_dict': connector.state_dict(),
        'src_vocab': src_vocab,
        'tgt_vocab': tgt_vocab,
        'src_index2word': src_index2word,
        'tgt_index2word': tgt_index2word,
    }, 'seq2seq_model.pt')
    print("Model saved to seq2seq_model.pt")


if __name__ == "__main__":
    main()

Loading data...
Building vocabularies...
Starting training...
Epoch 1/100 | Average Loss: 4.9196
Epoch 2/100 | Average Loss: 4.4319
Epoch 3/100 | Average Loss: 4.2186
Epoch 4/100 | Average Loss: 4.0630
Epoch 5/100 | Average Loss: 3.9356
Epoch 6/100 | Average Loss: 3.8279
Epoch 7/100 | Average Loss: 3.7348
Epoch 8/100 | Average Loss: 3.6596
Epoch 9/100 | Average Loss: 3.5724
Epoch 10/100 | Average Loss: 3.4162
Epoch 11/100 | Average Loss: 3.3514
Epoch 12/100 | Average Loss: 3.2612
Epoch 13/100 | Average Loss: 3.2140
Epoch 14/100 | Average Loss: 3.0584
Epoch 15/100 | Average Loss: 3.0649
Epoch 16/100 | Average Loss: 3.0570
Epoch 17/100 | Average Loss: 3.0194
Epoch 18/100 | Average Loss: 2.8732
Epoch 19/100 | Average Loss: 2.9340
Epoch 20/100 | Average Loss: 2.8854
Epoch 21/100 | Average Loss: 2.8497
Epoch 22/100 | Average Loss: 2.7914
Epoch 23/100 | Average Loss: 2.6690
Epoch 24/100 | Average Loss: 2.7513
Epoch 25/100 | Average Loss: 2.7667
Epoch 26/100 | Average Loss: 2.6297
Epoch 27/10

reference for the above code are:

https://docs.pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial -> pytorch tutorial on seq2seq translation.

https://github.com/astorfi/sequence-to-sequence-from-scratch/tree/master -> tutorial made by Sina Torfi
