In [1]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from tqdm import tqdm

In [2]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Hyper-Parameters
MAX_LENGTH = 25
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_LAYERS = 2
DROPOUT = 0.3
BATCH_SIZE = 64
NUM_EPOCHS = 50
LEARNING_RATE = 0.001
CLIP = 1

### Data Preprocessing

In [4]:
import pandas as pd

df = pd.read_csv("eng_spn.csv")
df.head()

Unnamed: 0.1,Unnamed: 0,English words/sentences,French words/sentences
0,0,go,ve
1,1,go,vete
2,2,go,vaya
3,3,go,vayase
4,4,hi,hola


In [5]:
input_data = df["English words/sentences"]
target_data = df["French words/sentences"].apply(lambda x: "<sos> " + x + " <eos>")

In [6]:
# Tokenize dataset
input_tokenizer = Tokenizer()
input_tokenizer.fit_on_texts(input_data)
input_sequences = input_tokenizer.texts_to_sequences(input_data)

target_tokenizer = Tokenizer()
target_tokenizer.fit_on_texts(target_data)
target_sequences = target_tokenizer.texts_to_sequences(target_data)

In [7]:
# Pad sequences
padded_input_sequences = pad_sequences(
    input_sequences, maxlen=MAX_LENGTH, padding="post"
)

padded_target_sequences = pad_sequences(
    target_sequences, maxlen=MAX_LENGTH, padding="post"
)

In [8]:
# Vocab size
input_vocab_size = len(input_tokenizer.word_index) + 1
target_vocab_size = len(target_tokenizer.word_index) + 1

In [9]:
# Convert to pytorch tensors
input_tensor = torch.tensor(padded_input_sequences, dtype=torch.long)
target_tensor = torch.tensor(padded_target_sequences, dtype=torch.long)

In [10]:
# Dataloader
dataloader = DataLoader(
    TensorDataset(input_tensor, target_tensor), batch_size=BATCH_SIZE, shuffle=True
)

### Encoder

In [11]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout):
        super(Encoder, self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True,
        )

        # Dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Convert tokens to vectors
        embedded = self.embedding(x)

        # Pass the embedded vector into LSTM layer
        lstm_output, (hidden, cell) = self.lstm(embedded)

        return lstm_output, hidden, cell

### Bahdanau Attention

In [12]:
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_dim):
        super(BahdanauAttention, self).__init__()

        # Linear layer to transform query(Q), key(K) and value(V)
        self.w_Q = nn.Linear(hidden_dim, hidden_dim)
        self.w_K = nn.Linear(hidden_dim, hidden_dim)
        self.w_V = nn.Linear(hidden_dim, 1)

    def forward(self, hidden, encoder_outputs):

        # Extract the last hidden state
        hidden = hidden[-1]
        hidden = hidden.unsqueeze(1)

        # Query from decoder
        Q = self.w_Q(hidden)

        # Key from encoder
        K = self.w_K(encoder_outputs)

        # Attention scores
        attention_scores = self.w_V(torch.tanh(Q + K))

        # Attention weights
        attention_weights = torch.softmax(attention_scores, dim=-1)

        attention_weights = attention_weights.transpose(1, 2)

        # Context vector
        context_vector = torch.bmm(attention_weights, encoder_outputs)

        return context_vector

### Decoder

In [13]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout):
        super(Decoder, self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # LSTM layer
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True,
        )

        # Dropout
        self.dropout = nn.Dropout(dropout)

        # Attention layer
        self.attention = BahdanauAttention(hidden_dim)

        # Linear layer
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)

    def forward(self, x, hidden, cell, encoder_outputs):
        # Add batch dimension
        x = x.unsqueeze(1)

        # Convert tokens to vectors
        embedded = self.embedding(x)

        # Pass the embedded vector into LSTM layer
        lstm_output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        # Calculate context vector
        context_vector = self.attention(hidden, encoder_outputs)

        # Concatenate lstm_output and context_vector
        concatenated = torch.cat((lstm_output, context_vector), dim=2)

        # Generate predictions for the next token
        prediction = self.fc(concatenated)

        return prediction, hidden, cell

### Seq2Seq

In [14]:
class Seq2Seq(nn.Module):
    def __init__(
        self,
        input_vocab_size,
        target_vocab_size,
        embed_dim,
        hidden_dim,
        num_layers,
        dropout,
    ):
        super(Seq2Seq, self).__init__()

        # Encoder
        self.encoder = Encoder(
            input_vocab_size, embed_dim, hidden_dim, num_layers, dropout
        )

        # Decoder
        self.decoder = Decoder(
            target_vocab_size, embed_dim, hidden_dim, num_layers, dropout
        )

    def forward(self, input, target):
        batch_size, max_length = target.size()
        target_vocab_size = self.decoder.fc.out_features

        # Tensor to store outputs for all time steps
        outputs = torch.zeros(batch_size, max_length, target_vocab_size)

        # Get encoder outputs, hidden and cell states from the encoder
        encoder_outputs, hidden, cell = self.encoder(input)

        # Start decoding with the first target token
        target_input_token = target[:, 0]

        for t in range(1, max_length):
            decoder_output, hidden, cell = self.decoder(
                target_input_token, hidden, cell, encoder_outputs
            )
            outputs[:, t, :] = decoder_output.squeeze(1)

            target_input_token = target[:, t]
            hidden = hidden
            cell = cell

        return outputs

### Training

In [15]:
# Initialize model
model = Seq2Seq(
    input_vocab_size,
    target_vocab_size,
    EMBEDDING_DIM,
    HIDDEN_DIM,
    NUM_LAYERS,
    DROPOUT,
)

In [16]:
# Checkpoint


# Save model function
def save_checkpoint(epoch, model, filename="checkpoint.pth"):
    torch.save(
        {
            "epoch": epoch + 1,
            "model_state_dict": model.state_dict(),
        },
        filename,
    )


# Load model function
def load_checkpoint(model, filename):
    checkpoint = torch.load(filename)

    start_epoch = checkpoint["epoch"]

    model.load_state_dict(checkpoint["model_state_dict"])
    return start_epoch

In [17]:
# Load model
try:
    start_epoch = load_checkpoint(model, filename="checkpoint.pth")
    print(f"Resuming training from epoch: {start_epoch}")
except FileNotFoundError:
    start_epoch = 1
    print(f"No checkpoint found, starting training from scratch...")

No checkpoint found, starting training from scratch...


  checkpoint = torch.load(filename)


In [18]:
# Initialize Adam optimizer and Loss function
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=0)

In [19]:
# Train function
def train(model, optimizer, criterion, dataloader, epochs=NUM_EPOCHS):

    model.train()  # Set model to Training mode

    total_loss = 0

    for epoch in range(start_epoch, epochs + 1):
        epoch_loss = 0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch}/{epochs}")

        for input, target in progress_bar:

            # Reset gradients
            optimizer.zero_grad()

            # Forward pass
            output = model(input, target)

            # Reshape input and target to calculate loss
            output = output[:, 1:].reshape(-1, output.shape[2])
            target = target[:, 1:].reshape(-1)

            # Compute loss and backpropagation
            loss = criterion(output, target)
            loss.backward()

            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)

            # Update model parameters
            optimizer.step()

            epoch_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())

        total_loss += epoch_loss

        progress_bar.close()

        save_checkpoint(epoch, model)

    print(f"Total Loss: {total_loss/len(dataloader)}")

In [20]:
# Training
train(model, optimizer, criterion, dataloader)

Epoch 1/50: 100%|██████████| 157/157 [05:14<00:00,  2.00s/it, loss=4.22]
Epoch 2/50: 100%|██████████| 157/157 [05:06<00:00,  1.95s/it, loss=3.15]
Epoch 3/50: 100%|██████████| 157/157 [03:42<00:00,  1.42s/it, loss=2.84]
Epoch 4/50: 100%|██████████| 157/157 [02:53<00:00,  1.11s/it, loss=1.77]
Epoch 5/50: 100%|██████████| 157/157 [02:49<00:00,  1.08s/it, loss=1.63]
Epoch 6/50: 100%|██████████| 157/157 [03:04<00:00,  1.17s/it, loss=0.94] 
Epoch 7/50: 100%|██████████| 157/157 [04:44<00:00,  1.81s/it, loss=1.06] 
Epoch 8/50: 100%|██████████| 157/157 [04:45<00:00,  1.82s/it, loss=1.13] 
Epoch 9/50: 100%|██████████| 157/157 [04:50<00:00,  1.85s/it, loss=0.847]
Epoch 10/50: 100%|██████████| 157/157 [04:39<00:00,  1.78s/it, loss=0.578]
Epoch 11/50: 100%|██████████| 157/157 [03:46<00:00,  1.44s/it, loss=0.666]
Epoch 12/50: 100%|██████████| 157/157 [04:37<00:00,  1.77s/it, loss=0.65] 
Epoch 13/50: 100%|██████████| 157/157 [04:38<00:00,  1.77s/it, loss=0.598]
Epoch 14/50: 100%|██████████| 157/157 [

Total Loss: 30.33850717051014





### Prediction

In [21]:
def predict(
    model, input_text, input_tokenizer, target_tokenizer, max_length=MAX_LENGTH
):

    # Set model to evaluation mode
    model.eval()

    # Convert text to sequence
    input_sequence = input_tokenizer.texts_to_sequences([input_text])

    # Apply padding
    padded_input_sequence = pad_sequences(
        input_sequence, maxlen=max_length, padding="post"
    )

    # Convert to torch tensor
    input_tensor = torch.tensor(padded_input_sequence, dtype=torch.long)

    with torch.no_grad():
        encoder_outputs, hidden, cell = model.encoder(input_tensor)

    sos_token = target_tokenizer.word_index["sos"]

    # Start prediction with "sos" token
    x_input = torch.tensor([sos_token], dtype=torch.long)

    # List to store predicted tokens
    translated_sentence = []

    for _ in range(max_length):
        with torch.no_grad():
            prediction, hidden, cell = model.decoder(
                x_input, hidden, cell, encoder_outputs
            )

        predicted_token = prediction.argmax(-1).item()

        # Stop prediction if "eos" is predicted
        if predicted_token == target_tokenizer.word_index["eos"]:
            break

        translated_sentence.append(predicted_token)

        # Update x_input for next time step
        x_input = torch.tensor([predicted_token], dtype=torch.long)

    # Convert tokens back to text
    translated_sentence = target_tokenizer.sequences_to_texts([translated_sentence])[0]

    return translated_sentence

In [23]:
# Predict
input_sentence = "im thirtyfour"
translated_sentence = predict(model, input_sentence, input_tokenizer, target_tokenizer)
print(f"Input Sentence: {input_sentence}")
print(f"Translated Sentence: {translated_sentence}")

Input Sentence: im thirtyfour
Translated Sentence: tengo treinta y cuatro anos
