# Transformer Model for Translation using PyTorch DIT MOET JE NIET ZELF KUNNEN CODEREN/TER INFORMATIE

In deze notebook zullen we een Transformer-model implementeren voor vertalingen. We zullen gebruik maken van PyTorch. We zullen de volgende onderdelen behandelen:

1. **Inleiding tot Transformer Architectuur**
2. **Data Voorbereiding**
3. **Model Architectuur**
4. **Training en Evaluatie**
5. **Voorspellingen**

## 1. Inleiding tot Transformer Architectuur

De Transformer is een type neurale netwerkarchitectuur die gebruik maakt van zelf-attentie mechanismen om sequentiële data te verwerken, zoals tekst voor vertalingen. De belangrijkste onderdelen van een Transformer zijn:

- **Encoder**: Neemt de bronzin en verwerkt deze in een reeks van contextuele representaties.
- **Decoder**: Neemt de contextuele representaties van de encoder en genereert de doeltalige zin.
- **Self-Attention Mechanism**: Stelt het model in staat om aandacht te besteden aan verschillende delen van de invoer tijdens de verwerking.

## 2. Data Voorbereiding

We zullen een voorbeeld dataset gebruiken voor vertalingen. Voor dit voorbeeld gebruiken we een kleine dataset van Engelse naar Franse zinnen. We zullen de data tokeniseren en omzetten naar numerieke waarden.

Hierbij worden de volgende stappen uitgevoerd:
* Tokenisatie: Split de tekst in woorden en zet deze om naar numerieke indices.
* Padding: Zorg ervoor dat alle zinnen dezelfde lengte hebben door padding toe te voegen.
* DataLoader: Zorgt voor batching en shuffling van data tijdens training.

In [None]:
# Importeer de benodigde libraries
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
from timeit import default_timer as timer
import os
import kagglehub

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Download en laad de dataset
path = kagglehub.dataset_download("devicharith/language-translation-englishfrench")

filename = 'eng_-french.csv'
df = pd.read_csv(f"{path}/{filename}")
df_train = df.sample(frac=0.8)
df_test = df.drop(df_train.index)

# Bekijk een paar voorbeelden van de data
print(df.head())

# Special tokens
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

def tokenize(text):
    return text.lower().split()

def build_vocab(sentences):
    counter = Counter()
    for sentence in sentences:
        counter.update(tokenize(sentence))
    vocab = special_symbols + list(counter.keys())
    return {word: idx for idx, word in enumerate(vocab)}

def sentence_to_tensor(sentence, vocab):
    indices = [vocab.get(word, UNK_IDX) for word in tokenize(sentence)]
    return torch.tensor(indices, dtype=torch.long)

# Collate function for batching
def collate_fn(batch):
    src_batch = [sentence_to_tensor(item['english'], source_vocab) for item in batch]
    tgt_batch = [sentence_to_tensor(item['french'], target_vocab) for item in batch]

    # Pad sequences
    src_batch = pad_sequence(src_batch, batch_first=False, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, batch_first=False, padding_value=PAD_IDX)

    # Add BOS and EOS
    bos_tensor = torch.full((1, src_batch.size(1)), BOS_IDX)  # [1, batch]
    eos_tensor = torch.full((1, src_batch.size(1)), EOS_IDX)  # [1, batch]

    src_batch = torch.cat((bos_tensor, src_batch, eos_tensor), dim=0)
    tgt_batch = torch.cat((bos_tensor, tgt_batch, eos_tensor), dim=0)

    return src_batch, tgt_batch

# Dataset wrapper
class TranslationDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        return {
            'english': row['English words/sentences'],
            'french': row['French words/sentences']
        }

# Build vocab
source_vocab = build_vocab(df['English words/sentences'].tolist())
target_vocab = build_vocab(df['French words/sentences'].tolist())

# Make dataset and dataloader
train_dataset = TranslationDataset(df_train)
test_dataset = TranslationDataset(df_test)

train_loader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)

# Check a batch
for src_batch, tgt_batch in train_loader:
    print("Source shape:", src_batch.shape)
    print("Target shape:", tgt_batch.shape)
    break

## Model Architectuur
We definiëren nu het Transformer-model. We maken gebruik van de encoder-decoder structuur met multi-head self-attention.

In [None]:
from torch import Tensor
import torch.nn as nn
from torch.nn import Transformer
import math

# Adds positional encoding to the token embedding
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout: float, maxlen: int = 100):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# Tokens to embeddings
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Seq2Seq Network
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout,
                                       batch_first=False)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                tgt: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        #print(src.shape, src_emb.shape, src_mask.shape)
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(tgt))
        #print(tgt.shape, tgt_emb.shape, tgt_mask.shape)
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=device).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

torch.manual_seed(0)

SRC_VOCAB_SIZE = source_vocab_size
TGT_VOCAB_SIZE = target_vocab_size
EMB_SIZE = 24
NHEAD = 2
FFN_HID_DIM = 32
NUM_ENCODER_LAYERS = 1
NUM_DECODER_LAYERS = 1

# maak model
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
# initialiseer gewichten
for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(device)
print(transformer)

In [None]:
for i, (src, tgt) in enumerate(train_loader):
    print("Batch shapes:", src.shape, tgt.shape)  # (batch, seq_len)

    src = src.to(device)
    tgt = tgt.to(device)

    # Use target input shifted by one (teacher forcing)
    tgt_input = tgt[:-1, ]   # everything except last token
    tgt_out   = tgt[1:, 1:]    # expected output (shifted by one)

    # Create masks
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

    with torch.no_grad():  # no gradients, inference mode
        logits = transformer(
            src, tgt_input, 
            src_mask, tgt_mask,
            src_padding_mask, tgt_padding_mask, src_padding_mask
        )

    print("Logits shape:", logits.shape)  
    # → (batch_size, tgt_seq_len, vocab_size)

    # Convert logits to predicted token IDs
    preds = torch.argmax(logits, dim=-1)
    print("Predictions shape:", preds.shape)  
    # → (batch_size, tgt_seq_len)

    break

## Training en Evaluatie

In [None]:
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

def compute_loss(model, loss_fn, src, tgt):
    src = src.to(device)
    tgt = tgt.to(device)

    tgt_input = tgt[:-1, :]

    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
    logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

    tgt_out = tgt[1:, :]
    return loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))

def train_epoch(model, optimizer, dataloader):
    model.train()
    losses = 0
    total_batches = len(dataloader)
    progress_interval = total_batches // 10
    
    for i, (src, tgt) in enumerate(dataloader):
        optimizer.zero_grad()
        loss = compute_loss(model, loss_fn, src, tgt)
        loss.backward()
        optimizer.step()
        losses += loss.item()
        if (i + 1) % progress_interval == 0:
            progress_percentage = ((i + 1) / total_batches) * 100
            print(f"Training progress: {progress_percentage:.1f}% - Batch {i + 1}/{total_batches}")
    return losses / len(list(dataloader))

NUM_EPOCHS = 10

# Function to save the model
def save_model(model, optimizer, model_path):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, model_path)
    print(f"Model saved to {model_path}")
    
# Function to load the model
def load_model(model, optimizer, model_path):
    if os.path.exists(model_path):
        checkpoint = torch.load(model_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        print(f"Model loaded from {model_path}")
        return True
    return False

# Check if the model exists, if so load it, otherwise train and save
if not load_model(transformer, optimizer, "transformer_model.pth"):
    for epoch in range(1, NUM_EPOCHS+1):
        start_time = timer()
        train_loss = train_epoch(transformer, optimizer, train_loader)
        end_time = timer()
        print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Epoch time = {(end_time - start_time):.3f}s"))
    save_model(transformer, optimizer, "transformer_model.pth")

## Vertalingen: OEFENING

Ga met je favoriete AI-tool op zoek naar een implementatie voor de translate_sentence functie.
Deze functie moet woord per woord een zin genereren.

In [None]:
def get_key_from_value(my_dict, target_value):
    for key, value in my_dict.items():
        if value == target_value:
            return key
    return None

def translate_sentence(model, src_sentence, src_vocab, tgt_vocab, max_len=50):
    model.eval()
    
    src_tokens = [src_vocab.get(token, UNK_IDX) for token in src_sentence.split()]
    src_tensor = torch.tensor(src_tokens).unsqueeze(1).to(device)
    src_mask = torch.zeros((src_tensor.shape[0], src_tensor.shape[0]), device=device).type(torch.bool)
    
    memory = model.encode(src_tensor, src_mask)
    
    tgt_tokens = [BOS_IDX]
    tgt_tensor = torch.tensor(tgt_tokens, dtype=torch.long).unsqueeze(1).to(device)
    for _ in range(max_len):
        tgt_mask = generate_square_subsequent_mask(tgt_tensor.size(0)).to(device)
        output = model.decode(tgt_tensor, memory, tgt_mask)
        output_logits = model.generator(output[-1, :])
        next_token = torch.argmax(output_logits, dim=-1).item()
        tgt_tokens.append(next_token)
        tgt_tensor = torch.tensor(tgt_tokens, dtype=torch.long).unsqueeze(1).to(device)
        if next_token == EOS_IDX:
            break
    translated_tokens = [get_key_from_value(tgt_vocab, tok) for tok in tgt_tokens[1:-1]]
    translated_sentence = " ".join(translated_tokens)
    return translated_sentence

In [None]:
# Example Usage
src_sentence = "translate this text to French"
translated_sentence = translate_sentence(transformer, src_sentence, source_vocab, target_vocab)
print(f"Translated sentence: {translated_sentence}")