# Sequence to Sequence Learning with Neural Networks

## 1. Libraries

In [39]:
import os
import re
from typing import List

import spacy
import torch
import torchtext
import pandas as pd
from torch import nn
from torch.utils.data import random_split
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import Dataset, DataLoader


# Controlling the randomness in PyTorch
RANDOM_SEED = 0
torch.backends.cudnn.benchmark = True
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)

## 2. The Dataset
I've used a Kaggle dataset. You can download it from [here](https://www.kaggle.com/datasets/devicharith/language-translation-englishfrench).

In [2]:
DATASET_PATH = os.path.join(
    "..", 
    "..", 
    "nlp", 
    "datasets", 
    "en-fr-translation", 
    "en-fr.csv"
)

df = pd.read_csv(DATASET_PATH)
df = df.rename(columns={"English words/sentences": "EN"})
df = df.rename(columns={"French words/sentences": "FR"})

df.head()

Unnamed: 0,EN,FR
0,Hi.,Salut!
1,Run!,Cours !
2,Run!,Courez !
3,Who?,Qui ?
4,Wow!,Ça alors !


In [3]:
# You should first download these two spaCy models!
en_tokenizer = get_tokenizer("spacy", "en_core_web_sm")
fr_tokenizer = get_tokenizer("spacy", "fr_core_news_sm")

In [4]:
def prepare_sentence(sentence: str):
    pattern = r"([.,!?:;]+)"
    sentence = re.sub(pattern, r" \1 ", sentence)

    pattern = r"\s+"
    sentence = re.sub(pattern, " ", sentence)

    return sentence


def iterate_corpus(corpus: List[str], tokenizer: spacy.tokenizer.Tokenizer, max_len: int):
    for sentence in corpus:
        tokens = tokenizer(
            prepare_sentence(sentence)
        )

        # Adding padding if it is needed.
        if len(tokens) >= max_len:
            tokens = tokens[:max_len]
        else:
            len_diff = max_len - len(tokens)
            tokens = tokens + ["<pad>"] * len_diff

        yield tokens


en_corpus = [sent for sent in list(df["EN"])]
fr_corpus = [sent for sent in list(df["FR"])]
EN_MAX_LEN = 200
FR_MAX_LEN = 200

en_vocab = build_vocab_from_iterator(
    iterate_corpus(en_corpus, en_tokenizer, EN_MAX_LEN), 
    specials=["<unk>", "<start>", "<end>", "<pad>"]
)
en_vocab.set_default_index(en_vocab["<unk>"])

fr_vocab = build_vocab_from_iterator(
    iterate_corpus(fr_corpus, fr_tokenizer, FR_MAX_LEN), 
    specials=["<unk>", "<start>", "<end>", "<pad>"]
)
fr_vocab.set_default_index(fr_vocab["<unk>"])

In [5]:
class TranslationDataset(Dataset):

    def __init__(
        self, 
        lang1_corpus: List[str], lang2_corpus: List[str],
        lang1_tokenizer: spacy.tokenizer.Tokenizer, lang2_tokenizer: spacy.tokenizer.Tokenizer,
        lang1_vocab: torchtext.vocab.Vocab, lang2_vocab: torchtext.vocab.Vocab,
        lang1_max_len: int = 200, lang2_max_len: int = 200
    ):
        self.l1_corpus = lang1_corpus
        self.l2_corpus = lang2_corpus

        self.l1_max_len = lang1_max_len
        self.l2_max_len = lang2_max_len
        
        self.l1_tokenizer = lang1_tokenizer
        self.l2_tokenizer = lang2_tokenizer
        
        self.l1_vocab = lang1_vocab
        self.l2_vocab = lang2_vocab

        self.x, self.y = self._get_x_y()

    def __getitem__(self, idx: int):
        return self.x[idx], self.y[idx]

    def __len__(self):
        return len(self.y)

    def _get_x_y(self):
        x = TranslationDataset._parse_corpus(
            self.l1_corpus, self.l1_tokenizer, self.l1_vocab, self.l1_max_len
        )
        y = TranslationDataset._parse_corpus(
            self.l2_corpus, self.l2_tokenizer, self.l2_vocab, self.l2_max_len
        )

        return x, y

    @staticmethod
    def _parse_corpus(
        corpus: List[str], 
        tokenizer: spacy.tokenizer.Tokenizer, 
        vocab: torchtext.vocab.Vocab,
        max_len: int
    ):
        output = []

        for sent in corpus:
            tokens = tokenizer(sent)
            indices = [vocab[token] for token in tokens]
            if len(indices) >= max_len:
                output.append(indices[:max_len])
            else:
                len_diff = max_len - len(indices)
                padding = [vocab["<pad>"]] * len_diff
                output.append(indices + padding)

        return torch.LongTensor(output)

dataset = TranslationDataset(
    lang1_corpus=en_corpus, lang2_corpus=fr_corpus,
    lang1_vocab=en_vocab, lang2_vocab=fr_vocab,
    lang1_tokenizer=en_tokenizer, lang2_tokenizer=fr_tokenizer,
    lang1_max_len=EN_MAX_LEN, lang2_max_len=FR_MAX_LEN
)
print("x.shape:", dataset.x.shape)
print("y.shape:", dataset.y.shape)

x.shape: torch.Size([175621, 200])
y.shape: torch.Size([175621, 200])


In [44]:
def train_validation_split(dataset: torch.utils.data.Dataset, train_size: float):
    train_set_size = int(len(dataset) * train_size)
    valid_set_size = len(dataset) - train_set_size
    datasets_lengths = [train_set_size, valid_set_size]

    # Splitting the input dataset into training and validation set.
    train_dataset, valid_dataset = random_split(dataset, datasets_lengths)

    return train_dataset, valid_dataset


train_dataset, valid_dataset = train_validation_split(
    dataset, train_size=0.9
)
print("Training dataset size:", len(train_dataset))
print("Validation dataset size:", len(valid_dataset))

Training dataset size: 158058
Validation dataset size: 17563


## 3. Seq2Seq Model

### 3.1. Encoder

In [29]:
class Encoder(nn.Module):

    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, padding_idx):
        super().__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embed = nn.Embedding(vocab_size, embed_size, padding_idx=padding_idx)
        self.lstm = nn.LSTM(
            input_size=embed_size, 
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

    def forward(self, x: torch.Tensor, hidden: tuple):
        embedded_x = self.embed(x)
        out, hidden = self.lstm(embedded_x, hidden)

        return out, hidden

    def get_init_hidden(self, batch_size):
        # See the PyTorch documentation of LSTM for these dimensions.
        h = torch.Tensor(size=(self.num_layers, batch_size, self.hidden_size))
        c = torch.Tensor(size=(self.num_layers, batch_size, self.hidden_size))

        return (h, c)


x = torch.randint(low=0, high=20, size=(10, 5))
enc = Encoder(vocab_size=50, embed_size=100, hidden_size=100, num_layers=4, padding_idx=0)
h, c = enc.get_init_hidden(10)
out, hidden = enc(x, (h, c))
print(f"Encoder output shape: {out.shape}")

Encoder output shape: torch.Size([10, 5, 100])


### 3.2. Decoder

In [47]:
class Decoder(nn.Module):

    def __init__(self, vocab_size, input_size, hidden_size, num_layers):
        super().__init__()

        self.lstm = nn.LSTM(
            input_size=input_size, 
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        self.project = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden):
        out, hidden = self.lstm(x, hidden)
        out = self.project(out)

        return out, hidden


# Passing through the Encoder
x = torch.randint(low=0, high=20, size=(10, 5))
enc = Encoder(vocab_size=50, embed_size=100, hidden_size=150, num_layers=4, padding_idx=0)
hidden = enc.get_init_hidden(10)
out, hidden = enc(x, hidden)
print(f"Encoder output shape: {out.shape}")

# Passing through the Decoder
dec = Decoder(vocab_size=200, input_size=150, hidden_size=150, num_layers=4)
out, hidden = dec(out, hidden)
print(f"Decoder output shape: {out.shape}")

Encoder output shape: torch.Size([10, 5, 150])
Decoder output shape: torch.Size([10, 5, 200])


### 3.3. Seq2Seq

In [51]:
class Seq2Seq(nn.Module):

    def __init__(self, encoder: torch.nn.Module, decoder: torch.nn.Module, reverse_input=True):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder

        self.reverse_input = reverse_input

    def forward(self, x: torch.Tensor, hidden: tuple):
        # Reversing the sequences in the input tensor.
        # In the paper it's stated that it is 'extremely valuable', and makes a difference.
        if self.reverse_input:
            x = torch.flip(x, [1])
        out, hidden = self.encoder(x, hidden)
        out, hidden = self.decoder(out, hidden)

        return out, hidden


# Input tensor:
x = torch.randint(low=0, high=20, size=(10, 5))
print(f"Input shape: {x.shape}")

# Defining the Encoder:
enc = Encoder(vocab_size=50, embed_size=100, hidden_size=150, num_layers=4, padding_idx=0)
init_hidden = enc.get_init_hidden(10)

# Defining the decoder:
dec = Decoder(vocab_size=200, input_size=150, hidden_size=150, num_layers=4)

# Creating the Seq2Seq model:
model = Seq2Seq(encoder=enc, decoder=dec)
out, hidden = model(x, init_hidden)
print(f"Seq2Seq output shape: {out.shape}")
print(f"Seq2Seq hidden shape: {hidden[0].shape}")

Input shape: torch.Size([10, 5])
Seq2Seq output shape: torch.Size([10, 5, 200])
Seq2Seq hidden shape: torch.Size([4, 10, 150])


In [52]:
def init_weights(model: torch.nn.Module):
    # Initializing the weights with the uniform distribution between -0.08 and 
    # 0.08.
    torch.nn.init.uniform_(model.weight, a=-0.08, b=0.08)

## 4. Training

In [None]:
class Seq2SeqTrainingSession:

    def __init__(
        self, 
        model: torch.nn.Module, 
        loss: torch.nn.Module, 
        optimizer: torch.optim.Optimizer,
        epochs: int, l_rate: float, batch_size=int, 
        use_clipping=True,
        device="cpu"
    ):
        self.model = model
        self.loss_func = loss
        self.optimizer = optimizer

        self.epochs = epochs
        self.l_rate = l_rate
        self.batch_size

        self.use_clipping = use_clipping
        self.clip = 5
        self.device = device

    def start(
        self, 
        train_dataset: torch.utils.data.Subset, 
        valid_dataset: torch.utils.data.Subset,
        initial_hidden: torch.Tensor
    ):
        self.init_hidden = initial_hidden
        
        train_dataloader = DataLoader(
            train_dataset, 
            self.batch_size, 
            shuffle=True, 
            num_workers=0
        )
        valid_dataloader = DataLoader(
            train_dataset, 
            self.batch_size, 
            shuffle=True, 
            num_workers=0
        )
        
        for epoch in range(self.epochs):
            train_loss = self._train_epoch(train_dataloader)
            valid_loss = self._valid_epoch(valid_dataloader)
            print(f"Epoch: {epoch + 1}, Training Loss: {train_loss:.2f}, Validation Loss: {valid_loss:.2f}")

        return self.model

    def _train_epoch(self, dataloader):
        hidden = self.init_hidden

        for batch_i, (x, y) in enumerate(dataloader):
            x, y = x.to(self.device), y.to(self.device)

            y_pred, hidden = self.model(x, hidden)

            loss = self.loss_func(y_pred, y)
            self.optimizer.zero_grad()

            loss.backward()

            # Clipping the gradients, since LSTMs can have exploding gradients.
            if self.use_clipping:
                torch.nn.utils.clip_grad_norm_(
                    self.model.parameters(), 
                    max_norm=self.clip
                )

            self.optimizer.step()

        epoch_loss = loss.item()

        return epoch_loss


    def _valid_epoch(self, dataloader):
        hidden = self.init_hidden

        for batch_i, (x, y) in enumerate(dataloader):
            x, y = x.to(self.device), y.to(self.device)

            y_pred, hidden = self.model(x, hidden)

            loss = self.loss_func(y_pred, y)

        epoch_loss = loss.item()
        
        return epoch_loss
