In [1]:
import math
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from tqdm import tqdm

In [2]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Hyper-Parameters
D_MODEL = 512
MAX_LENGTH = 25
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_HEADS = 4
NUM_LAYERS = 6
DROPOUT = 0.25
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.001
CLIP = 1

### Data Preprocessing

In [None]:
import pandas as pd

df = pd.read_csv("eng_spn.csv")
df.head()

In [5]:
input_data = df["English words/sentences"]
target_data = df["French words/sentences"].apply(lambda x: "<sos> " + x + " <eos>")

In [6]:
# Tokenize dataset
input_tokenizer = Tokenizer()
input_tokenizer.fit_on_texts(input_data)
input_sequences = input_tokenizer.texts_to_sequences(input_data)

target_tokenizer = Tokenizer()
target_tokenizer.fit_on_texts(target_data)
target_sequences = target_tokenizer.texts_to_sequences(target_data)

In [7]:
# Pad sequences
padded_input_sequences = pad_sequences(
    input_sequences, maxlen=MAX_LENGTH, padding="post"
)

padded_target_sequences = pad_sequences(
    target_sequences, maxlen=MAX_LENGTH, padding="post"
)

In [8]:
# Vocab size
input_vocab_size = len(input_tokenizer.word_index) + 1
target_vocab_size = len(target_tokenizer.word_index) + 1

In [9]:
# Convert to pytorch tensors
input_tensor = torch.tensor(padded_input_sequences, dtype=torch.long)
target_tensor = torch.tensor(padded_target_sequences, dtype=torch.long)

In [10]:
# Dataloader
dataloader = DataLoader(
    TensorDataset(input_tensor, target_tensor), batch_size=BATCH_SIZE, shuffle=True
)

### Positional Encoding

In [11]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length):
        super(PositionalEncoding, self).__init__()

        # Initialize positional encoding matrix with zeros
        self.positional_encoding = torch.zeros(max_length, d_model)

        # Compute positions (0 to max_length-1)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)

        # Compute scaling factor for indices
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float()
            * -(torch.log(torch.tensor(10000.0)) / d_model)
        )

        # Apply sine for even indices and cosine for odd indices
        self.positional_encoding[:, 0::2] = torch.sin(position * div_term)
        self.positional_encoding[:, 1::2] = torch.cos(position * div_term)

        # Add batch dimension to positional encoding
        self.positional_encoding = self.positional_encoding.unsqueeze(0)

    def forward(self, x):
        x = x + self.positional_encoding[:, : x.size(1)].to(x.device)
        return x

### Multihead Attention

In [12]:
class MultiheadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        super(MultiheadAttention, self).__init__()

        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Linear layer to transform query(Q), key(K) and value(V)
        self.w_Q = nn.Linear(d_model, d_model)
        self.w_K = nn.Linear(d_model, d_model)
        self.w_V = nn.Linear(d_model, d_model)

        # Linear layer to combine all heads
        self.w_O = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention score and normalize
        attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(
            self.head_dim
        )

        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)

        # Convert scores to probabilities
        attention_probs = torch.softmax(attention_scores, dim=-1)

        # Context vector
        context_vector = torch.matmul(attention_probs, V)

        return context_vector

    # Split heads function
    def split_heads(self, x):
        batch_size, max_length, d_model = x.size()
        x = x.view(batch_size, max_length, self.num_heads, self.head_dim).transpose(
            1, 2
        )
        return x

    # Combine heads function
    def combine_heads(self, x):
        batch_size, num_heads, max_length, head_dim = x.size()
        x = x.transpose(1, 2).contiguous().view(batch_size, max_length, self.d_model)
        return x

    def forward(self, q, k, v, mask=None):
        # Project input tensors into query, key and value spaces
        Q = self.w_Q(q)
        K = self.w_K(k)
        V = self.w_V(v)

        # Split Q, K and V into multiple heads
        Q = self.split_heads(Q)
        K = self.split_heads(K)
        V = self.split_heads(V)

        # Compute the context vector
        context_vector = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combine context from all heads
        combined_head_context = self.combine_heads(context_vector)

        # Project combined heads back to original d_model space
        output = self.w_O(combined_head_context)

        return output

### Feed Forward Neural Network

In [13]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.5):
        super(FeedForward, self).__init__()

        # Feed-Forward network
        self.ffnn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
        )

    def forward(self, x):
        return self.ffnn(x)

### Encoder

In [14]:
class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.5):
        super(Encoder, self).__init__()

        # Multihead self-attention layer
        self.self_attention = MultiheadAttention(d_model, num_heads)

        # Feed-Forward layer
        self.ffnn = FeedForward(d_model, d_ff, dropout)

        # Layer normalization layer
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        # Dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):

        # Self attention followed by normalization and dropout
        self_attention_output = self.self_attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(self_attention_output))

        # Feed-Forward followed by normalization and dropout
        ffnn_output = self.ffnn(x)
        x = self.norm2(x + self.dropout(ffnn_output))

        return x

### Decoder

In [15]:
class Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.5):
        super(Decoder, self).__init__()

        # Multihead self-attention layer
        self.self_attention = MultiheadAttention(d_model, num_heads)

        # Multihead cross-attention layer
        self.cross_attention = MultiheadAttention(d_model, num_heads)

        # Feed-Forward layer
        self.ffnn = FeedForward(d_model, d_ff, dropout)

        # Layer normalization layer
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        # Dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):

        # Self attention followed by normalization and dropout
        self_attention_output = self.self_attention(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attention_output))

        # Cross attention followed by normalization and dropout
        cross_attention_output = self.cross_attention(
            x, encoder_output, encoder_output, src_mask
        )
        x = self.norm2(x + self.dropout(cross_attention_output))

        # Feed-Forward followed by normalization and dropout
        ffnn_output = self.ffnn(x)
        x = self.norm3(x + self.dropout(ffnn_output))

        return x

### Transformer

In [16]:
class Transformer(nn.Module):
    def __init__(
        self,
        input_vocab_size,
        target_vocab_size,
        d_model,
        max_length,
        num_heads,
        num_layers,
        d_ff,
        dropout=0.5,
    ):
        super(Transformer, self).__init__()

        # Embedding layer for input
        self.input_embedding = nn.Embedding(input_vocab_size, d_model)

        # Embedding layer for target
        self.target_embedding = nn.Embedding(target_vocab_size, d_model)

        # Positional Encoding
        self.positional_encoding = PositionalEncoding(d_model, max_length)

        # Encoder layers
        self.encoder = nn.ModuleList(
            [Encoder(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )

        # Decoder layers
        self.decoder = nn.ModuleList(
            [Decoder(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )

        # Output layer to map decoder output to target vocab size
        self.fc_out = nn.Linear(d_model, target_vocab_size)

        # Dropout
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, input, target):
        batch_size, target_length = target.size()
        _, input_length = input.size()

        # Create input mask (1 for non-padding, 0 for padding tokens)
        input_mask = (input != 0).unsqueeze(1).unsqueeze(2)

        # Padding mask for the target
        target_padding_mask = (input != 0).unsqueeze(1).unsqueeze(2)

        # No peek mask for the target
        target_no_peek_mask = torch.triu(
            torch.ones((1, target_length, target_length)), diagonal=1
        ).bool()

        # Combine the target masks
        target_mask = target_padding_mask & ~target_no_peek_mask

        return input_mask, target_mask

    def forward(self, input, target):

        # Generate input and target masks
        input_mask, target_mask = self.generate_mask(input, target)

        # Apply embedding
        embedded_input = self.input_embedding(input)
        embedded_target = self.target_embedding(target)

        # Apply positional encoding
        input = self.positional_encoding(embedded_input)
        target = self.positional_encoding(embedded_target)

        # Pass input through encoder layers
        for encoder_layer in self.encoder:
            input = encoder_layer(input, input_mask)

        # Pass target through decoder layers
        for decoder_layer in self.decoder:
            target = decoder_layer(target, input, input_mask, target_mask)

        # Output layer for final prediction
        output = self.fc_out(target)

        return output

### Training

In [17]:
# Initialize model
model = Transformer(
    input_vocab_size,
    target_vocab_size,
    D_MODEL,
    MAX_LENGTH,
    NUM_HEADS,
    NUM_LAYERS,
    1024,
    DROPOUT,
)

In [18]:
# Checkpoint


# Save model function
def save_checkpoint(epoch, model, filename="checkpoint.pth"):
    torch.save(
        {
            "epoch": epoch + 1,
            "model_state_dict": model.state_dict(),
        },
        filename,
    )


# Load model function
def load_checkpoint(model, filename):
    checkpoint = torch.load(filename)

    start_epoch = checkpoint["epoch"]

    model.load_state_dict(checkpoint["model_state_dict"])
    return start_epoch

In [None]:
# Load model
try:
    start_epoch = load_checkpoint(model, filename="checkpoint.pth")
    print(f"Resuming training from epoch: {start_epoch}")
except FileNotFoundError:
    start_epoch = 1
    print(f"No checkpoint found, starting training from scratch...")

In [20]:
# Initialize Adam optimizer and Loss function
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=0)

In [21]:
# Train function
def train(model, optimizer, criterion, dataloader, epochs=NUM_EPOCHS):

    model.train()  # Set model to training mode

    total_loss = 0

    for epoch in range(start_epoch, epochs + 1):
        epoch_loss = 0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch}/{epochs}")

        for input, target in progress_bar:
            # Reset gradients
            optimizer.zero_grad()

            # Forward pass
            output = model(input, target)

            # Reshape output and target to calculate loss
            output = output[:, 1:].reshape(-1, output.shape[2])  # Flatten the output
            target = target[:, 1:].reshape(-1)  # Flatten the target

            # Compute loss and backpropagation
            loss = criterion(output, target)
            loss.backward()

            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)

            # Update model parameters
            optimizer.step()

            epoch_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())

        total_loss += epoch_loss

        progress_bar.close()

        save_checkpoint(epoch, model)

    print(f"Total Loss: {total_loss/len(dataloader)}")

In [None]:
# Training
train(model, optimizer, criterion, dataloader)

### Prediction

In [23]:
def predict(
    model, input_text, input_tokenizer, target_tokenizer, max_length=MAX_LENGTH
):

    model.eval()  # Set the model to evaluation mode

    # Tokenizing and padding the input text
    input_sequence = input_tokenizer.texts_to_sequences([input_text])
    padded_input_sequences = pad_sequences(
        input_sequence, maxlen=max_length, padding="post"
    )
    input_tensor = torch.tensor(padded_input_sequences, dtype=torch.long)

    with torch.no_grad():

        # Input embedding
        embedded_input = model.input_embedding(input_tensor)
        input_positional = model.positional_encoding(embedded_input)

        # Create input mask
        input_mask = (input_tensor != 0).unsqueeze(1).unsqueeze(2)
        encoder_output = input_positional

        # Pass input through the encoder layers
        for encoder_layer in model.encoder:
            encoder_output = encoder_layer(encoder_output, input_mask)

    # sos and eos tokens
    sos_token = target_tokenizer.word_index["sos"]
    eos_token = target_tokenizer.word_index["eos"]

    # Start decoding from the sos token
    x_input = torch.tensor([[sos_token]], dtype=torch.long)

    translated_sentence = []

    for _ in range(max_length):
        with torch.no_grad():
            # Target embedding
            embedded_target = model.target_embedding(x_input)
            target_positional = model.positional_encoding(embedded_target)

            # Create target mask
            target_padded_mask = (x_input != 0).unsqueeze(1).unsqueeze(2)
            target_no_peek_mask = torch.triu(
                torch.ones(1, x_input.size(1), x_input.size(1)), diagonal=1
            ).bool()

            target_mask = target_padded_mask & target_no_peek_mask

            # Pass target through the decoder layers
            decoder_output = target_positional
            for decoder_layer in model.decoder:
                decoder_output = decoder_layer(
                    decoder_output, encoder_output, input_mask, target_mask
                )

            # Get logits for the last tokens
            logits = model.fc_out(decoder_output[:, -1, :])
            predicted_token = torch.argmax(logits, dim=-1).item()

        # If predicted_token is eos, stop prediction
        if predicted_token == eos_token:
            break

        translated_sentence.append(predicted_token)

        x_input = torch.cat(
            [x_input, torch.tensor([[predicted_token]], dtype=torch.long)], dim=1
        )

    # Convert predicted token sequence back to text
    translated_sentence_text = target_tokenizer.sequences_to_texts(
        [translated_sentence]
    )[0]

    return translated_sentence

In [None]:
# Predict
input_sentence = "ill teach tom"
translated_sentence = predict(model, input_sentence, input_tokenizer, target_tokenizer)
print(f"Input Sentence: {input_sentence}")
print(f"Translated Sentence: {translated_sentence}")