In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import math
import random

# Sample toy dataset
data = [
    ("bonjour", "hello"),
    ("je suis étudiant", "i am a student"),
    ("j'aime le football", "i like football"),
    ("il fait beau", "it is sunny"),
    ("merci", "thank you")
]

# Tokenizer + Vocabulary
class Vocab:
    def __init__(self, sentences):
        tokens = set()
        for sentence in sentences:
            tokens.update(sentence.strip().split())
        self.word2idx = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
        self.idx2word = {0: "<pad>", 1: "<sos>", 2: "<eos>", 3: "<unk>"}
        for i, token in enumerate(tokens, 4):
            self.word2idx[token] = i
            self.idx2word[i] = token

    def encode(self, sentence):
        return [self.word2idx.get(word, 3) for word in sentence.strip().split()] + [2]

    def decode(self, tokens):
        words = [self.idx2word[token] for token in tokens if token not in [0, 1, 2]]
        return ' '.join(words)

    def __len__(self):
        return len(self.word2idx)

# Create vocabularies
french_vocab = Vocab([fr for fr, en in data])
english_vocab = Vocab([en for fr, en in data])

# Custom Dataset
class TranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, idx):
        fr, en = self.data[idx]
        return torch.tensor(french_vocab.encode(fr)), torch.tensor([1] + english_vocab.encode(en))

    def __len__(self):
        return len(self.data)

def collate_fn(batch):
    fr_sentences, en_sentences = zip(*batch)
    fr_pad = nn.utils.rnn.pad_sequence(fr_sentences, padding_value=0)
    en_pad = nn.utils.rnn.pad_sequence(en_sentences, padding_value=0)
    return fr_pad, en_pad

dataset = TranslationDataset(data)
loader = DataLoader(dataset, batch_size=2, collate_fn=collate_fn)

# Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=128, nhead=8, num_layers=2):
        super().__init__()
        self.src_embed = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = nn.Parameter(self._generate_positional_encoding(d_model, 100))
        self.transformer = nn.Transformer(d_model, nhead, num_layers, num_layers)
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def _generate_positional_encoding(self, d_model, max_len):
        pos = torch.arange(0, max_len).unsqueeze(1)
        i = torch.arange(0, d_model, 2).float()
        angle_rates = 1 / (10000 ** (i / d_model))
        angle_rads = pos * angle_rates
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(angle_rads)
        pe[:, 1::2] = torch.cos(angle_rads)
        return pe.unsqueeze(1)

    def forward(self, src, tgt):
        src = self.src_embed(src) + self.pos_encoder[:src.size(0)]
        tgt = self.tgt_embed(tgt) + self.pos_encoder[:tgt.size(0)]
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(0)).to(tgt.device)
        out = self.transformer(src, tgt, tgt_mask=tgt_mask)
        return self.fc_out(out)

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(len(french_vocab), len(english_vocab)).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=0)

def train(model, loader, epochs=10):
    model.train()
    for epoch in range(epochs):
        for src, tgt in loader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:-1, :]
            tgt_output = tgt[1:, :]

            optimizer.zero_grad()
            output = model(src, tgt_input)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1} Loss: {loss.item():.4f}")

train(model, loader)

# Inference
def translate_sentence(model, sentence):
    model.eval()
    tokens = french_vocab.encode(sentence)
    src = torch.tensor(tokens).unsqueeze(1).to(device)
    src = model.src_embed(src) + model.pos_encoder[:src.size(0)]
    memory = model.transformer.encoder(src)

    ys = torch.ones(1, 1).fill_(1).type(torch.long).to(device)  # <sos>
    for i in range(20):
        tgt_emb = model.tgt_embed(ys) + model.pos_encoder[:ys.size(0)]
        tgt_mask = model.transformer.generate_square_subsequent_mask(ys.size(0)).to(device)
        out = model.transformer.decoder(tgt_emb, memory, tgt_mask=tgt_mask)
        out = model.fc_out(out)
        next_token = out.argmax(dim=-1)[-1, 0].item()
        ys = torch.cat([ys, torch.ones(1, 1).type_as(ys).fill_(next_token)], dim=0)
        if next_token == 2:
            break
    return english_vocab.decode(ys.squeeze().tolist())

# Example usage
print("Translation:", translate_sentence(model, "salut"))


Epoch 1 Loss: 3.5413
Epoch 2 Loss: 1.8870
Epoch 3 Loss: 1.0239
Epoch 4 Loss: 0.6197
Epoch 5 Loss: 0.4051
Epoch 6 Loss: 0.2640
Epoch 7 Loss: 0.1651
Epoch 8 Loss: 0.0995
Epoch 9 Loss: 0.0570
Epoch 10 Loss: 0.0451
Translation: thank you
