In [None]:
import os

url_path = "https://raw.githubusercontent.com/bzitko/nlp_repo/main/assignments/a05/mt/"
downloads = {"eng_fra_simplest.csv": None}

for download_name, extract_name in downloads.items():
    if extract_name and os.path.exists(extract_name):
        continue

    if not os.path.exists(download_name):
        import requests
        response = requests.get(f"{url_path}{download_name}")
        with open(download_name, "wb") as fp:
            fp.write(response.content)
        response.close()

    if not extract_name:
        continue

    _, ext = os.path.splitext(download_name)
    if ext == ".bz2":    
        import bz2
        with open(download_name, 'rb') as bzf, open(extract_name, 'wb') as fp:
            fp.write(bz2.decompress(bzf.read()))
    elif ext == ".zip":
        from zipfile import ZipFile
        with ZipFile(download_name) as zf:
            zf.extractall(path=".")

# Machine Translation English to French

## 1 Preprocessing
### 1.1 Imports and Tokenizer

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from typing import List, Tuple
import random

# Dummy tokenizer (replace with your tokenizer)
def tokenize(text: str) -> List[str]:
    return text.split()

### 1.2 Vocabulary

This class defines a vocabulary for converting words into indices and vice versa. The `__init__` method initializes the vocabulary with special tokens for padding (`<pad>`), start of sentence (`<sos>`), end of sentence (`<eos>`), and unknown words (`<unk>`). It creates mappings between words and indices (`word2idx`) and vice versa (`idx2word`). The `indices_to_sentence` method converts a list of indices back into a sentence by mapping each index to the corresponding word, ignoring padding and special tokens like `<eos>`.

In [None]:
# Vocabulary class
class Vocabulary:
    def __init__(self):
        self.pad_idx = 0
        self.sos_idx = 1
        self.eos_idx = 2
        self.unk_idx = 3

        self.word2idx = {"<pad>": self.pad_idx, 
                         "<sos>": self.sos_idx, 
                         "<eos>": self.eos_idx, 
                         "<unk>": self.unk_idx}
        
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        
    def indices_to_sentence(self, indices: List[int]) -> List[str]:
        return [self.idx2word[idx] for idx in indices if idx not in (self.pad_idx, self.eos_idx, self.eos_idx)]



👍 Add each word in a tokenized sentence to the vocabulary, assigning it a unique index if it’s not already present.

Implement `add_sentence` method. Loop through each word in the input `sentence`. If the word is not in `word2idx`, assign it the next available index using `len(self.word2idx)`, and add the word and its index to both `word2idx` and `idx2word`.

In [None]:
def add_sentence(self, sentence: List[str]):
    for word in sentence:
        if word not in self.word2idx:
            idx = len(self.word2idx)
            self.word2idx[word] = idx
            self.idx2word[idx] = word

Vocabulary.add_sentence = add_sentence

# Initialize vocabulary
vocab = Vocabulary()

# Test 1: Adding a sentence
sentence = ["I", "am", "learning"]
vocab.add_sentence(sentence)
assert "I" in vocab.word2idx, "Test 1 failed: 'I' should be in word2idx."
assert vocab.word2idx["I"] == 4, "Test 1 failed: Index for 'I' should be 4."

# Test 2: Adding the same sentence again (no new words)
vocab.add_sentence(sentence)
assert len(vocab.word2idx) == 7, "Test 2 failed: No new words should be added."

# Test 3: Adding a new word
sentence = ["new", "word"]
vocab.add_sentence(sentence)
assert "new" in vocab.word2idx, "Test 3 failed: 'new' should be in word2idx."
assert vocab.word2idx["new"] == 7, "Test 3 failed: Index for 'new' should be 7."

# Test 4: Checking special tokens
assert vocab.word2idx["<pad>"] == 0, "Test 4 failed: <pad> should have index 0."
assert vocab.word2idx["<sos>"] == 1, "Test 4 failed: <sos> should have index 1."
assert vocab.word2idx["<eos>"] == 2, "Test 4 failed: <eos> should have index 2."
assert vocab.word2idx["<unk>"] == 3, "Test 4 failed: <unk> should have index 3."


👍 Convert a tokenized sentence into a list of indices using the vocabulary, adding start-of-sequence (`<sos>`) at the beginning and end-of-sequence (`<eos>`) at the end.

First, add the start-of-sequence index (`sos_idx`) at the beginning of the list. Then, for each word in the sentence, get its index from `word2idx`. If the word is not in the vocabulary, use the unknown word index (`unk_idx`). Finally, add the end-of-sequence index (`eos_idx`) at the end of the list and return the list of indices.

In [None]:
def sentence_to_indices(self, sentence: List[str]) -> List[int]:
    return [self.sos_idx] + [self.word2idx.get(word, self.unk_idx) for word in sentence] + [self.eos_idx]

Vocabulary.sentence_to_indices = sentence_to_indices

# Initialize vocabulary and add some sentences
vocab = Vocabulary()
vocab.add_sentence(["I", "am", "learning"])
vocab.add_sentence(["I", "love", "coding"])

# Test 1: Converting a sentence to indices
sentence = ["I", "am", "learning"]
indices = vocab.sentence_to_indices(sentence)
assert indices == [1, 4, 5, 6, 2], "Test 1 failed: Incorrect indices."

# Test 2: Handling an unknown word
sentence = ["I", "am", "sleeping"]
indices = vocab.sentence_to_indices(sentence)
assert indices == [1, 4, 5, 3, 2], "Test 2 failed: Incorrect indices with unknown word."

# Test 3: Empty sentence
sentence = []
indices = vocab.sentence_to_indices(sentence)
assert indices == [1, 2], "Test 3 failed: Incorrect indices for empty sentence."

# Test 4: Check special tokens
assert vocab.sentence_to_indices(["I"]) == [1, 4, 2], "Test 4 failed: Incorrect indices for sentence with a single word."


👍 Convert a list of indices back into a sentence, excluding the special tokens `<pad>` and `<eos>`.

For each index in the list of `indices`, look up the corresponding word in `idx2word`. Exclude the special tokens (`<pad>`, `<sos>` and `<eos>`) and return the resulting list of words.


In [None]:
def indices_to_sentence(self, indices: List[int]) -> List[str]:
    return [self.idx2word[idx] for idx in indices if idx not in (self.pad_idx, self.sos_idx, self.eos_idx)]

Vocabulary.indices_to_sentence = indices_to_sentence

# Initialize vocabulary and add some sentences
vocab = Vocabulary()
vocab.add_sentence(["I", "am", "learning"])

# Test 1: Convert indices back to sentence
indices = [1, 4, 5, 6, 2]
sentence = vocab.indices_to_sentence(indices)
assert sentence == ["I", "am", "learning"], "Test 1 failed: Incorrect sentence conversion."

# Test 2: Handle indices with special tokens
indices = [1, 4, 5, 3, 2]
sentence = vocab.indices_to_sentence(indices)
assert sentence == ["I", "am", "<unk>"], "Test 2 failed: Incorrect sentence with unknown word."

# Test 3: Handle empty indices
indices = [1, 2]
sentence = vocab.indices_to_sentence(indices)
assert sentence == [], "Test 3 failed: Incorrect sentence for empty indices."

# Test 4: Exclude special tokens
indices = [1, 4, 2, 0, 0]
sentence = vocab.indices_to_sentence(indices)
assert sentence == ["I"], "Test 4 failed: Incorrect handling of <pad> token."



### 1.3 Dataset

This class defines a custom dataset for machine translation. It takes pairs of source and target sentences, along with their respective vocabularies, to convert sentences into sequences of word indices. In the `__init__` method, it ensures the source and target sentences have the same length and then converts each sentence into indices using the vocabulary. The `__len__` method returns the total number of sentences, while `__getitem__` retrieves a source and target sentence pair (as tensors) based on an index. This class is used for loading data efficiently during training.

In [None]:
# Dataset class
class TranslationDataset(Dataset):
    def __init__(self, source_sentences: List[str], target_sentences: List[str], 
                 src_vocab: Vocabulary, tgt_vocab: Vocabulary):
        assert len(source_sentences) == len(target_sentences), \
            "Source and target sentences must contain same number of sentences."
        self.src_sentences = [src_vocab.sentence_to_indices(tokenize(sentence)) 
                              for sentence in source_sentences]
        self.tgt_sentences = [tgt_vocab.sentence_to_indices(tokenize(sentence)) 
                              for sentence in target_sentences]

    def __len__(self):
        return len(self.src_sentences)
    
    def __getitem__(self, idx):
        return torch.tensor(self.src_sentences[idx]), torch.tensor(self.tgt_sentences[idx])


## 2 Model
### 2.1 Encoder

👍 Process a sequence of input indices through the embedding layer and the GRU RNN layer, then return the hidden state from the RNN.

Implement `forward` method. First, pass the input sequence (`src`) through the embedding layer to obtain embeddings. Then, feed these embeddings into the GRU to obtain the RNN's outputs and hidden state. Return the hidden state.

In [None]:
# Encoder
class EncoderRNN(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers):
        super(EncoderRNN, self).__init__()
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.GRU(embed_dim, hidden_dim, n_layers, batch_first=True)
    
    def forward(self, src):
        embedded = self.embedding(src)
        outputs, hidden = self.rnn(embedded)
        return hidden
    

# Initialize vocabulary, EncoderRNN, and input
vocab = Vocabulary()
vocab.add_sentence(["I", "am", "learning"])

encoder = EncoderRNN(input_dim=len(vocab.word2idx), embed_dim=8, hidden_dim=16, n_layers=2)
src = torch.tensor([1, 4, 5, 6, 2]).unsqueeze(0)  # Example input sequence (batch size 1)

# Test 1: Check the shape of hidden state returned by the forward pass
hidden = encoder(src)
assert hidden.shape == (2, 1, 16), "Test 1 failed: Hidden state shape is incorrect."

# Test 2: Check if the output is a tensor
assert isinstance(hidden, torch.Tensor), "Test 2 failed: Output should be a torch tensor."

# Test 3: Check the hidden state for different input sequences
src_2 = torch.tensor([1, 3, 2]).unsqueeze(0)
hidden_2 = encoder(src_2)
assert hidden_2.shape == (2, 1, 16), "Test 3 failed: Hidden state shape is incorrect for different input."


### 2.2 Decoder

👍 Process a target sequence (`tgt`) through the embedding layer, pass it through the GRU layer using the given hidden state, and generate predictions using the output of the GRU layer.

Implement `forward` method. First, pass the target sequence (`tgt`) through the embedding layer to get the embeddings. Then, pass the embeddings and the hidden state through the GRU to get the RNN's outputs and updated hidden state. Finally, use a fully connected layer (`fc_out`) to make predictions from the GRU's outputs. Return both the predictions and the updated hidden state.

In [None]:
# Decoder
class DecoderRNN(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.rnn = nn.GRU(embed_dim, hidden_dim, n_layers, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, tgt, hidden):
        embedded = self.embedding(tgt)
        outputs, hidden = self.rnn(embedded, hidden)
        predictions = self.fc_out(outputs)
        return predictions, hidden


# Initialize vocabulary, DecoderRNN, and input
vocab = Vocabulary()
vocab.add_sentence(["I", "am", "learning"])
decoder = DecoderRNN(output_dim=len(vocab.word2idx), embed_dim=8, hidden_dim=16, n_layers=2)
tgt = torch.tensor([1, 4, 5, 6, 2]).unsqueeze(0)  # Example target sequence (batch size 1)
hidden = torch.zeros(2, 1, 16)  # Initial hidden state (num layers 2, batch size 1, hidden_dim 16)

# Test 1: Check the shape of predictions and hidden state returned by the forward pass
predictions, hidden_out = decoder(tgt, hidden)
assert predictions.shape == (1, 5, len(vocab.word2idx)), "Test 1 failed: Predictions shape is incorrect."
assert hidden_out.shape == (2, 1, 16), "Test 1 failed: Hidden state shape is incorrect."

# Test 2: Check if the output predictions are a tensor
assert isinstance(predictions, torch.Tensor), "Test 2 failed: Predictions should be a torch tensor."

# Test 3: Ensure the hidden state is updated correctly
assert hidden_out is not hidden, "Test 3 failed: Hidden state should be updated after passing through GRU."

# Test 4: Check for consistent output size with a different target sequence
tgt_2 = torch.tensor([1, 3, 2]).unsqueeze(0)
predictions_2, hidden_out_2 = decoder(tgt_2, hidden)
assert predictions_2.shape == (1, 3, len(vocab.word2idx)), "Test 4 failed: Predictions shape is incorrect for different target."


### 2.3 Sequence to Sequence

👍 Process the source sequence (`src`) through the encoder and the target sequence (`tgt`) through the decoder, using the hidden state from the encoder. Return the decoder's output.

Implement `forward` method. First, pass the source sequence (`src`) through the encoder to obtain the hidden state. Then, pass the target sequence (`tgt`) and the hidden state to the decoder. The decoder will output the predictions and the updated hidden state, but only the predictions should be returned by the `forward` method.

In [None]:
# Seq2Seq Model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src, tgt):
        hidden = self.encoder(src)
        outputs, hidden = self.decoder(tgt, hidden)
        return outputs
    

# Initialize vocabulary, EncoderRNN, DecoderRNN, and Seq2Seq model
vocab = Vocabulary()
vocab.add_sentence(["I", "am", "learning"])
encoder = EncoderRNN(input_dim=len(vocab.word2idx), embed_dim=8, hidden_dim=16, n_layers=2)
decoder = DecoderRNN(output_dim=len(vocab.word2idx), embed_dim=8, hidden_dim=16, n_layers=2)
model = Seq2Seq(encoder, decoder)
src = torch.tensor([1, 4, 5, 6, 2]).unsqueeze(0)  # Example source sequence (batch size 1)
tgt = torch.tensor([1, 4, 5, 6, 2]).unsqueeze(0)  # Example target sequence (batch size 1)

# Test 1: Check the shape of the outputs returned by the forward pass
outputs = model(src, tgt)
assert outputs.shape == (1, 5, len(vocab.word2idx)), "Test 1 failed: Output shape is incorrect."

# Test 2: Check if the output is a tensor
assert isinstance(outputs, torch.Tensor), "Test 2 failed: Output should be a torch tensor."

# Test 3: Ensure the forward pass works with different sequences
src_2 = torch.tensor([1, 3, 2]).unsqueeze(0)
tgt_2 = torch.tensor([1, 3, 2]).unsqueeze(0)
outputs_2 = model(src_2, tgt_2)
assert outputs_2.shape == (1, 3, len(vocab.word2idx)), "Test 3 failed: Output shape is incorrect for different sequence."

# Test 4: Check the shape of the encoder hidden state with 2 layers
hidden_2_layers = torch.zeros(2, 1, 16)  # Initial hidden state for 2 layers
hidden = encoder(src)
assert hidden.shape == (2, 1, 16), "Test 4 failed: Hidden state shape is incorrect for 2 layers."


## 3 Train

### 3.1 Data Initialization

This code prepares the data for training a machine translation model. It first checks if CUDA (GPU support) is available for faster computation. Then, it defines a small dummy dataset with English and French sentences. Vocabulary objects are created for both the source (English) and target (French) languages, and words are added to the vocabularies using the `add_sentence` method. A `TranslationDataset` is created, which pairs the English and French sentences together. A DataLoader is used to load the data in batches, and a custom collate function is defined to pad the sentences to equal lengths, ensuring they can be processed in batches.



In [None]:
# Use CUDA if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dummy corpus for demonstration purposes
english_sentences = [
    "hello how are you",
    "what is your name",
    "I am learning machine translation",
    "this is an example sentence"
]
french_sentences = [
    "bonjour comment allez-vous",
    "quel est votre nom",
    "j'apprends la traduction automatique",
    "c'est une phrase exemple"
]

# Vocabulary and Dataset Preparation
src_vocab = Vocabulary()
tgt_vocab = Vocabulary()

# Build vocabularies
for sentence in english_sentences:
    src_vocab.add_sentence(tokenize(sentence))
for sentence in french_sentences:
    tgt_vocab.add_sentence(tokenize(sentence))

# Dataset
dataset = TranslationDataset(english_sentences, french_sentences, src_vocab, tgt_vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda batch: collate_fn(batch, tgt_vocab))

# Collate function to pad sequences
def collate_fn(batch, tgt_vocab):
    src_batch, tgt_batch = zip(*batch)
    src_padded = nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=src_vocab.pad_idx)
    tgt_padded = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=tgt_vocab.pad_idx)
    return src_padded, tgt_padded


### 3.2 Model initialization

This code defines the hyperparameters and initializes the model for training. The hyperparameters include the input and output dimensions (based on the source and target vocabularies), the embedding dimension, hidden dimension, number of layers in the encoder and decoder, learning rate, and the number of epochs for training. The encoder and decoder models are created using the `EncoderRNN` and `DecoderRNN` classes, respectively, and combined into a sequence-to-sequence (`Seq2Seq`) model. The loss function used is cross-entropy, which ignores padding tokens in the target sequences, and the optimizer is Adam with a specified learning rate. The model and all components are moved to the available device (GPU or CPU).

In [None]:
# Hyperparameters
INPUT_DIM = len(src_vocab.word2idx)
OUTPUT_DIM = len(tgt_vocab.word2idx)
EMBED_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 2
LEARNING_RATE = 0.001
NUM_EPOCHS = 20

# Encoder and Decoder
encoder = EncoderRNN(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS).to(device)
decoder = DecoderRNN(OUTPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS).to(device)
model = Seq2Seq(encoder, decoder).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.pad_idx)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)


### 3.3 Training

The `train_model` function trains the model for a specified number of epochs. For each batch in the dataloader, it moves the source and target sequences to the device and resets gradients. It prepares the target sequence for teacher forcing by splitting it into `tgt_input` (used as input) and `tgt_output` (the expected output). After performing a forward pass through the model, the loss is calculated by comparing the predicted output with the actual target. The loss is backpropagated, and the optimizer updates the model’s parameters. The epoch loss is printed after each epoch.

**Teacher forcing** is a technique used during the training of sequence-to-sequence models where the model is fed the true output (from the training data) at each step of the decoding process, rather than using its own previous predictions. This helps the model learn the correct sequence of outputs more quickly by reducing the accumulation of errors in long sequences.

Consider a model translating an English sentence "I am learning" to French. During training:

1. **Without teacher forcing**: The model predicts the first word ("Je") and then uses that prediction ("Je") as input to predict the next word ("suis"). However, if the first prediction is wrong, the error compounds, making it harder to predict subsequent words correctly.

2. **With teacher forcing**: Instead of using the model's predicted word ("Je"), the true word ("Je") from the training data is given as input for predicting the next word ("suis"). This reduces errors because the model is always given the correct word during training, making learning faster and more stable.

In [None]:
# Training function
def train_model(model, dataloader, optimizer, criterion, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()

            # Prepare inputs for teacher forcing
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]
            
            # Forward pass
            outputs = model(src, tgt_input)
            outputs = outputs.reshape(-1, outputs.size(-1))
            tgt_output = tgt_output.reshape(-1)
            
            # Compute loss
            loss = criterion(outputs, tgt_output)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        print(f"Epoch {epoch + 1}, Loss: {epoch_loss:.4f}")

# Run training
train_model(model, dataloader, optimizer, criterion, NUM_EPOCHS)

## 4 Test

### 4.1 Translation

👍 Implement the `prepare_input` function, which should take a sentence, tokenize it, convert the tokens into indices using the source vocabulary, and return a tensor of the indices. The tensor should have a batch dimension (i.e., the sentence should be wrapped in a list). Ensure the tensor is placed on the correct device (CPU or CUDA).

In [None]:
def prepare_input(sentence, src_vocab, device):
    tokens = tokenize(sentence)
    src_indices = src_vocab.sentence_to_indices(tokens)
    src_tensor = torch.tensor([src_indices], dtype=torch.long, device=device)
    return src_tensor

# Create a mock source vocabulary
src_vocab = Vocabulary()
src_vocab.add_sentence(tokenize("hello how are you"))

# Prepare input sentence
sentence = "hello how are you"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
src_tensor = prepare_input(sentence, src_vocab, device)

# Assert that the tensor has the correct shape and device
assert src_tensor.shape == torch.Size([1, 6])  # batch size of 1, 6 tokens (2 special tokens)
assert src_tensor.device == device  # Check that the tensor is on the correct device
assert src_tensor[0, 0].item() == src_vocab.sos_idx  # First token should be <sos>
assert src_tensor[0, -1].item() == src_vocab.eos_idx  # Last token should be <eos>


The `encode_input` function takes the source sentence tensor (`src_tensor`), moves it to the specified device (CPU or GPU), and then passes it through the encoder of the model to get the hidden state. The `torch.no_grad()` context is used to disable gradient computation, as the function is used during inference (not training). The function then returns the hidden state generated by the encoder.

In [None]:
def encode_input(model, src_tensor, device):
    src_tensor = src_tensor.to(device)
    with torch.no_grad():
        hidden = model.encoder(src_tensor)
    return hidden

👍 Implement the `decode_sentence` function, which decodes a sequence of tokens from the target vocabulary using a trained model. The function should start with the `<sos>` token, then iteratively predict the next token based on the model's outputs until either the maximum length is reached or the `<eos>` token is generated. The function should return a list of token indices.

1. Initialize the target sequence with the `<sos>` token.
2. For each step (up to `max_len`):
   - Feed the current target sequence into the decoder.
   - Get the decoder's output and hidden state.
   - Predict the next token by taking the argmax of the output.
   - Append the predicted token to the target sequence.
   - Stop if the `<eos>` token is generated.
3. Return the sequence of predicted target token indices.

In [None]:
def decode_sentence(model, tgt_vocab, hidden, max_len=50):
    tgt_indices = [tgt_vocab.sos_idx]
    for _ in range(max_len):
        tgt_tensor = torch.tensor([tgt_indices], dtype=torch.long, device=device)
        with torch.no_grad():
            output, hidden = model.decoder(tgt_tensor, hidden)
        next_token = output.argmax(2)[:, -1].item()
        tgt_indices.append(next_token)
        if next_token == tgt_vocab.eos_idx:
            break
    return tgt_indices

# Prepare mock input for testing
tgt_vocab = Vocabulary()
tgt_vocab.add_sentence(tokenize("bonjour comment allez-vous"))

# Assume model is a pre-trained Seq2Seq model with an encoder and decoder
# Assume 'hidden' is the hidden state from the encoder (mocked here for testing purposes)
hidden = torch.zeros(2, 1, 512)  # Mock hidden state (num laters2, batch size 1, embedded dim)

# Call the decode_sentence function
tgt_indices = decode_sentence(model, tgt_vocab, hidden, max_len=10)

# Assert that the output is a list of token indices and it ends with <eos>
assert isinstance(tgt_indices, list)
assert tgt_indices[-1] == tgt_vocab.eos_idx  # Should end with <eos> token
assert len(tgt_indices) <= 10  # Should not exceed max_len


The `translate_indices_to_sentence` function takes a list of target token indices (`tgt_indices`) and converts them back into a sentence using the `tgt_vocab`. It excludes the `<sos>` token (since it's used only at the beginning) and returns the translated sentence as a string of words joined by spaces.

In [None]:
def translate_indices_to_sentence(tgt_indices, tgt_vocab):
    translated_tokens = tgt_vocab.indices_to_sentence(tgt_indices[1:])  # Exclude <sos>
    return " ".join(translated_tokens)


The `translate_sentence` function translates a given input sentence using the trained model. First, it prepares the input sentence by tokenizing and converting it into tensor format. Then, it encodes the input sentence using the model's encoder. Next, the sentence is decoded step by step to generate the output tokens. Finally, the generated token indices are converted back into a human-readable sentence. The function returns the translated sentence as a string.

In [None]:
def translate_sentence(sentence, model, src_vocab, tgt_vocab, device, max_len=50):
    model.eval()
    
    # Step 1: Prepare input
    src_tensor = prepare_input(sentence, src_vocab, device)

    # Step 2: Encode input
    hidden = encode_input(model, src_tensor, device)

    # Step 3: Decode the sentence
    tgt_indices = decode_sentence(model, tgt_vocab, hidden, max_len)

    # Step 4: Convert indices to sentence
    translated_sentence = translate_indices_to_sentence(tgt_indices, tgt_vocab)
    
    return translated_sentence


source_sentence = "hello how are you"
translated_sentence = translate_sentence(source_sentence, model, src_vocab, tgt_vocab, device)
print("source sentence:", sentence)
print("translated sentence:", translated_sentence)


### 4.2 Evaluation

This function evaluates the performance of a trained sequence-to-sequence model on a given dataset. It calculates the average loss and BLEU score across all sentences in the dataset. First, it performs a forward pass to compute the loss for each batch using the provided criterion. Then, for each sentence, it decodes the source sentence, compares the generated translation with the reference translation, and computes the BLEU score. Finally, it returns the average loss and average BLEU score for the entire dataset.

In [None]:
from nltk.translate.bleu_score import sentence_bleu

def evaluate_model(model, dataloader, criterion, src_vocab, tgt_vocab, device, max_len=50):
    model.eval()
    total_loss = 0
    total_bleu_score = 0
    total_sentences = 0

    with torch.no_grad():
        for src, tgt in dataloader:
            src, tgt = src.to(device), tgt.to(device)

            # Prepare inputs for evaluation
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            # Forward pass to compute loss
            outputs = model(src, tgt_input)
            outputs = outputs.reshape(-1, outputs.size(-1))
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(outputs, tgt_output)
            total_loss += loss.item()

            # Decode and compute BLEU score for each sentence
            for i in range(src.size(0)):  # Loop over the batch
                src_sentence = src[i]
                tgt_sentence = tgt[i]
                
                # Decode source sentence
                src_tokens = [src_vocab.idx2word[idx.item()] for idx in src_sentence if idx.item() != src_vocab.word2idx["<pad>"]]
                reference = [tgt_vocab.indices_to_sentence(tgt_sentence.cpu().numpy())]
                hypothesis = translate_sentence(" ".join(src_tokens), model, src_vocab, tgt_vocab, device, max_len).split()

                # Compute BLEU score
                total_bleu_score += sentence_bleu(reference, hypothesis)
                total_sentences += 1

    avg_loss = total_loss / len(dataloader)
    avg_bleu_score = total_bleu_score / total_sentences
    return avg_loss, avg_bleu_score


# Example: Create a validation DataLoader
validation_sentences = [
    ("hello how are you", "bonjour comment allez-vous"),
    ("what is your name", "quel est votre nom")
]
val_src_sentences, val_tgt_sentences = zip(*validation_sentences)

validation_dataset = TranslationDataset(val_src_sentences, val_tgt_sentences, src_vocab, tgt_vocab)
validation_dataloader = DataLoader(validation_dataset, batch_size=2, shuffle=False, collate_fn=lambda batch: collate_fn(batch, tgt_vocab))

# Evaluate the model
avg_loss, avg_bleu_score = evaluate_model(model, validation_dataloader, criterion, src_vocab, tgt_vocab, device)
print(f"Validation Loss: {avg_loss:.4f}")
print(f"Average BLEU Score: {avg_bleu_score:.4f}")


# Do Everythin on Larger Dataset

## Data

In [None]:
import pandas as pd

machine_translation_df = pd.read_csv("eng_fra_simplest.csv")

train_df = machine_translation_df[machine_translation_df.split == "train"].drop("split", axis=1).drop(machine_translation_df.columns[0], axis=1)
val_df = machine_translation_df[machine_translation_df.split == "val"].drop(machine_translation_df.columns[0], axis=1)
test_df = machine_translation_df[machine_translation_df.split == "test"].drop(machine_translation_df.columns[0], axis=1)

train_df

## Training

In [None]:
english_sentences = list(train_df.eng)
french_sentences = list(train_df.fra)

src_vocab = Vocabulary()
tgt_vocab = Vocabulary()

# Build vocabularies
for sentence in english_sentences:
    src_vocab.add_sentence(tokenize(sentence))
for sentence in french_sentences:
    tgt_vocab.add_sentence(tokenize(sentence))

# Dataset
BATCH_SIZE = 32
dataset = TranslationDataset(english_sentences, french_sentences, src_vocab, tgt_vocab)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=lambda batch: collate_fn(batch, tgt_vocab))

# Hyperparameters
INPUT_DIM = len(src_vocab.word2idx)
OUTPUT_DIM = len(tgt_vocab.word2idx)
EMBED_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 2
LEARNING_RATE = 0.001
NUM_EPOCHS = 20

# Encoder and Decoder
encoder = EncoderRNN(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS).to(device)
decoder = DecoderRNN(OUTPUT_DIM, EMBED_DIM, HIDDEN_DIM, N_LAYERS).to(device)
model = Seq2Seq(encoder, decoder).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.pad_idx)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Run training
train_model(model, dataloader, optimizer, criterion, NUM_EPOCHS)

## Evaluation

In [None]:

english_sentences = list(val_df.eng)
french_sentences = list(val_df.fra)

validation_dataset = TranslationDataset(english_sentences, french_sentences, src_vocab, tgt_vocab)
validation_dataloader = DataLoader(validation_dataset, batch_size=2, shuffle=False, collate_fn=lambda batch: collate_fn(batch, tgt_vocab))

# Evaluate the model
avg_loss, avg_bleu_score = evaluate_model(model, validation_dataloader, criterion, src_vocab, tgt_vocab, device)
print(f"Validation Loss: {avg_loss:.4f}")
print(f"Average BLEU Score: {avg_bleu_score:.4f}")