In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import math
import random
import re
from collections import Counter
from nltk.translate.bleu_score import corpus_bleu

In [4]:
def normalize_string(s):
    s = s.lower().strip() # Lowercasing
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

class Vocab:
    def __init__(self):
        self.word2idx = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
        self.idx2word = {0: "<pad>", 1: "<sos>", 2: "<eos>", 3: "<unk>"}
        self.idx = 4

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            if word not in self.word2idx:
                self.word2idx[word] = self.idx
                self.idx2word[self.idx] = word
                self.idx += 1

# Load and sample dataset
def load_dataset(filepath, num_samples=10000):
    eng_sentences, spa_sentences = [], []
    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # Sample subset
    lines = lines[:num_samples]

    for line in lines:
        parts = line.split('\t') # Separated by a tab
        if len(parts) >= 2:
            eng = normalize_string(parts[0])
            spa = normalize_string(parts[1])
            eng_sentences.append(eng)
            spa_sentences.append(spa)

    return eng_sentences, spa_sentences

eng_data, spa_data = load_dataset('spa.txt', num_samples=10000)

eng_vocab = Vocab()
spa_vocab = Vocab()

for eng, spa in zip(eng_data, spa_data):
    eng_vocab.add_sentence(eng)
    spa_vocab.add_sentence(spa)

In [5]:
class TranslationDataset(Dataset):
    def __init__(self, src_data, trg_data, src_vocab, trg_vocab, max_len=50):
        self.src_data = src_data
        self.trg_data = trg_data
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_data)

    def process_sentence(self, sentence, vocab):
        tokens = [vocab.word2idx.get(word, vocab.word2idx["<unk>"]) for word in sentence.split(' ')]
        tokens = [vocab.word2idx["<sos>"]] + tokens + [vocab.word2idx["<eos>"]]
        # Padding
        if len(tokens) < self.max_len:
            tokens += [vocab.word2idx["<pad>"]] * (self.max_len - len(tokens))
        return torch.tensor(tokens[:self.max_len])

    def __getitem__(self, idx):
        src = self.process_sentence(self.src_data[idx], self.src_vocab)
        trg = self.process_sentence(self.trg_data[idx], self.trg_vocab)
        return src, trg

dataset = TranslationDataset(eng_data, spa_data, eng_vocab, spa_vocab)

# Splitting 80% Train, 10% Val, 10% Test
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Sinusoidal positional encoding
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Adding positional encoding to input embeddings
        return x + self.pe[:, :x.size(1), :]

In [13]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Scaled dot-product attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            # Masking for padded tokens or future tokens in decoder
            scores = scores.masked_fill(mask == 0, -1e9)

        attention = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention, V)
        return output

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        Q = self.W_q(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # REMOVED: mask = mask.unsqueeze(1)  <-- This was causing the dimension mismatch

        # Extend to multi-head attention
        attention_output = self.scaled_dot_product_attention(Q, K, V, mask)
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        return self.W_o(attention_output)

In [14]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # Multi-head self-attention layer with residual connection and layer normalization
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))

        # Position-wise Feed Forward Network with residual connection and layer normalization
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [15]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, trg_mask):
        # Masked multi-head self-attention
        self_attn_output = self.self_attn(x, x, x, trg_mask)
        x = self.norm1(x + self.dropout(self_attn_output))

        # Encoder-Decoder (cross) attention
        cross_attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))

        # Feed Forward Network
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [16]:
class Seq2SeqTransformer(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, d_model=512, num_heads=8, num_layers=6, d_ff=2048, max_len=5000, dropout=0.1):
        super(Seq2SeqTransformer, self).__init__()

        # 1. Embedding Layer
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.trg_embedding = nn.Embedding(trg_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)

        # Stack multiple encoder layers
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        # Stack multiple decoder layers
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc_out = nn.Linear(d_model, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model

    def make_src_mask(self, src, src_pad_idx):
        src_mask = (src != src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask

    def make_trg_mask(self, trg, trg_pad_idx):
        trg_pad_mask = (trg != trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=trg.device)).bool()
        trg_mask = trg_pad_mask & trg_sub_mask
        return trg_mask

    def forward(self, src, trg, src_pad_idx, trg_pad_idx):
        src_mask = self.make_src_mask(src, src_pad_idx)
        trg_mask = self.make_trg_mask(trg, trg_pad_idx)

        # Source embedding and positional encoding
        src_emb = self.dropout(self.pos_encoder(self.src_embedding(src) * math.sqrt(self.d_model)))

        # Pass through Encoder
        enc_output = src_emb
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_mask)

        # Target embedding and positional encoding
        trg_emb = self.dropout(self.pos_encoder(self.trg_embedding(trg) * math.sqrt(self.d_model)))

        # Pass through Decoder
        dec_output = trg_emb
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, trg_mask)

        output = self.fc_out(dec_output)
        return output

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PAD_IDX = eng_vocab.word2idx["<pad>"]

model = Seq2SeqTransformer(
    src_vocab_size=eng_vocab.idx,
    trg_vocab_size=spa_vocab.idx,
    d_model=256,
    num_heads=8,
    num_layers=3,
    d_ff=512,
    dropout=0.1
).to(device)

# Optimizer & Loss
optimizer = optim.Adam(model.parameters(), lr=0.0005) # Adam optimizer
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) # Cross-Entropy with padding mask

In [18]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(device), trg.to(device)

        optimizer.zero_grad()

        # Teacher forcing: Decoder input is everything except last token
        trg_input = trg[:, :-1]
        # Target for loss is everything except first token (<sos>)
        trg_target = trg[:, 1:]

        output = model(src, trg_input, PAD_IDX, PAD_IDX)

        output_dim = output.shape[-1]
        output = output.contiguous().view(-1, output_dim)
        trg_target = trg_target.contiguous().view(-1)

        loss = criterion(output, trg_target)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Training execution
N_EPOCHS = 10
CLIP = 1

print("Starting Training...")
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')

Starting Training...
Epoch: 01 | Train Loss: 3.664
Epoch: 02 | Train Loss: 2.551
Epoch: 03 | Train Loss: 1.981
Epoch: 04 | Train Loss: 1.537
Epoch: 05 | Train Loss: 1.190
Epoch: 06 | Train Loss: 0.917
Epoch: 07 | Train Loss: 0.730
Epoch: 08 | Train Loss: 0.591
Epoch: 09 | Train Loss: 0.493
Epoch: 10 | Train Loss: 0.445


In [19]:
def calculate_bleu(test_loader, model, eng_vocab, spa_vocab, device):
    model.eval()
    targets = []
    predictions = []

    with torch.no_grad():
        for src, trg in test_loader:
            src = src.to(device)
            trg = trg.to(device)

            # Greedy decoding for evaluation
            batch_size = src.shape[0]
            trg_indexes = torch.full((batch_size, 1), spa_vocab.word2idx["<sos>"], dtype=torch.long, device=device)

            for _ in range(50): # Max len
                output = model(src, trg_indexes, PAD_IDX, PAD_IDX)
                pred_token = output.argmax(2)[:, -1].unsqueeze(1)
                trg_indexes = torch.cat((trg_indexes, pred_token), dim=1)

            # Convert indices to words for BLEU calculation
            for i in range(batch_size):
                pred_words = []
                for idx in trg_indexes[i, 1:]:
                    if idx.item() == spa_vocab.word2idx["<eos>"]: break
                    pred_words.append(spa_vocab.idx2word[idx.item()])
                predictions.append(pred_words)

                trg_words = []
                for idx in trg[i, 1:]:
                    if idx.item() == spa_vocab.word2idx["<eos>"]: break
                    if idx.item() != PAD_IDX:
                        trg_words.append(spa_vocab.idx2word[idx.item()])
                targets.append([trg_words])

    bleu_score = corpus_bleu(targets, predictions) * 100
    return bleu_score

# Evaluate translation performance using BLEU score
bleu = calculate_bleu(test_loader, model, eng_vocab, spa_vocab, device)
print(f'Test BLEU Score: {bleu:.2f}')

Test BLEU Score: 24.64
