# A Transformer Model for Language Translation


# Objectives
After completing this lab, you will be able to:

- Translate a PDF document from German to English




In [1]:
#!pip install -U spacy==3.7.2
#!pip install -Uqq portalocker==2.7.0
#!pip install -qq torchtext==0.14.1
#!pip install -Uq nltk==3.8.1

#!python -m spacy download de
#!python -m spacy download en

#!pip install pdfplumber==0.9.0
#!pip install fpdf==1.7.2

#!wget 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0205EN-SkillsNetwork/Multi30K_de_en_dataloader.py'
#!wget 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0201EN-Coursera/transformer.pt'
#!wget 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0201EN-Coursera/input_de.pdf'

## Importing required libraries


In [2]:

import matplotlib.pyplot as plt
from nltk.translate.bleu_score import sentence_bleu
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math
from tqdm import tqdm

# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

In [3]:
from datasets import load_dataset

dataset = load_dataset("bentrevett/multi30k")
train_data = dataset['train']
val_data = dataset['validation']
test_data = dataset['test']

# Access the data
for example in train_data:
    print(f"English: {example['en']}")
    print(f"German: {example['de']}")
    break

English: Two young, White males are outside near many bushes.
German: Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.


In [4]:
# Define special symbols and indices
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
# Make sure the tokens are in order of their indices to properly insert them in vocab
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

In [5]:
from datasets import load_dataset
from torch.utils.data import DataLoader
import torch
from collections import Counter
import pickle
import os

def build_vocab(sentences, min_freq=2):
    """Build vocabulary from sentences"""
    counter = Counter()
    for sentence in sentences:
        counter.update(sentence.split())

    vocab = {'<unk>': 0, '<pad>': 1, '<bos>': 2, '<eos>': 3}
    for word, freq in counter.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)

    return vocab

def text_to_tensor(text, vocab, max_len=None):
    """Convert text to tensor using vocabulary"""
    tokens = ['<bos>'] + text.split() + ['<eos>']
    if max_len:
        tokens = tokens[:max_len]

    indices = [vocab.get(token, vocab['<unk>']) for token in tokens]
    return torch.tensor(indices, dtype=torch.long)

def get_translation_dataloaders_hf(batch_size=1, max_len=50):
    """
    Replacement for TorchText's get_translation_dataloaders using Hugging Face Datasets
    Returns tensors that can be transposed with .T
    """
    # Load Multi30k dataset
    dataset = load_dataset("bentrevett/multi30k")

    # Get train and validation datasets
    train_dataset = dataset['train']
    val_dataset = dataset['validation']

    # Build vocabularies (you might want to save/load these)
    print("Building vocabularies...")
    en_sentences = [item['en'] for item in train_dataset]
    de_sentences = [item['de'] for item in train_dataset]

    en_vocab = build_vocab(en_sentences)
    de_vocab = build_vocab(de_sentences)

    print(f"English vocab size: {len(en_vocab)}")
    print(f"German vocab size: {len(de_vocab)}")

    def collate_fn(batch):
        """Custom collate function to convert text to tensors"""
        english_tensors = []
        german_tensors = []

        # Find max length in batch for padding
        max_en_len = max(len(item['en'].split()) + 2 for item in batch)  # +2 for <bos>, <eos>
        max_de_len = max(len(item['de'].split()) + 2 for item in batch)

        for item in batch:
            en_tensor = text_to_tensor(item['en'], en_vocab, max_len)
            de_tensor = text_to_tensor(item['de'], de_vocab, max_len)

            # Pad to max length in batch
            en_padded = torch.nn.functional.pad(en_tensor, (0, max_en_len - len(en_tensor)), value=en_vocab['<pad>'])
            de_padded = torch.nn.functional.pad(de_tensor, (0, max_de_len - len(de_tensor)), value=de_vocab['<pad>'])

            english_tensors.append(en_padded)
            german_tensors.append(de_padded)

        # Stack into batch tensors
        english_batch = torch.stack(english_tensors)  # [batch_size, seq_len]
        german_batch = torch.stack(german_tensors)    # [batch_size, seq_len]

        return english_batch, german_batch

    train_dataloader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_fn
    )

    val_dataloader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=collate_fn
    )

    # Store vocabularies as attributes for later use
    train_dataloader.en_vocab = en_vocab
    train_dataloader.de_vocab = de_vocab
    val_dataloader.en_vocab = en_vocab
    val_dataloader.de_vocab = de_vocab

    return train_dataloader, val_dataloader

# Your replacement code:
train_dataloader, val_dataloader = get_translation_dataloaders_hf(batch_size=1)

# Create iterator
data_itr = iter(train_dataloader)

# Now this will work with tensors
english, german = next(data_itr)
print(f"English tensor shape: {english.shape}")
print(f"German tensor shape: {german.shape}")

# Now you can transpose!
german = german.T
english = english.T

print(f"After transpose - English: {english.shape}")
print(f"After transpose - German: {german.shape}")

# Example: decode back to text to verify
def decode_tensor(tensor, vocab):
    """Convert tensor back to text"""
    idx_to_word = {v: k for k, v in vocab.items()}
    words = [idx_to_word.get(idx.item(), '<unk>') for idx in tensor.squeeze()]
    # Remove padding and special tokens for display
    words = [w for w in words if w not in ['<pad>', '<bos>', '<eos>']]
    return ' '.join(words)

print(f"English text: {decode_tensor(english, train_dataloader.en_vocab)}")
print(f"German text: {decode_tensor(german, train_dataloader.de_vocab)}")

Building vocabularies...
English vocab size: 7964
German vocab size: 9762
English tensor shape: torch.Size([1, 17])
German tensor shape: torch.Size([1, 15])
After transpose - English: torch.Size([17, 1])
After transpose - German: torch.Size([15, 1])
English text: An elderly man sits outside a storefront accompanied by a young boy with a cart.
German text: Ein älterer Mann sitzt mit einem Jungen mit einem Wagen vor einer Fassade.


In [6]:
from datasets import load_dataset

# Load dataset
dataset = load_dataset("bentrevett/multi30k")
train_data = dataset['train']

# Create simple iterator
data_itr = iter(train_data)

In [7]:
data_itr=iter(train_dataloader)
data_itr

<torch.utils.data.dataloader._SingleProcessDataLoaderIter at 0x166ac3aa0>

In [8]:
for n in range(1000):
    german, english= next(data_itr)

In [9]:
german=german.T
english=english.T

In [10]:
def index_to_german(tensor, vocab=None):
    """Convert German tensor indices back to text"""
    if vocab is None:
        raise ValueError("Need German vocabulary to decode")

    idx_to_word = {v: k for k, v in vocab.items()}
    if tensor.dim() > 1:
        # Handle batch dimension
        sentences = []
        for i in range(tensor.shape[0]):
            words = [idx_to_word.get(idx.item(), '<unk>') for idx in tensor[i]]
            # Remove special tokens and padding
            words = [w for w in words if w not in ['<pad>', '<bos>', '<eos>']]
            sentences.append(' '.join(words))
        return sentences
    else:
        words = [idx_to_word.get(idx.item(), '<unk>') for idx in tensor]
        words = [w for w in words if w not in ['<pad>', '<bos>', '<eos>']]
        return ' '.join(words)

def index_to_eng(tensor, vocab=None):
    """Convert English tensor indices back to text"""
    if vocab is None:
        raise ValueError("Need English vocabulary to decode")

    idx_to_word = {v: k for k, v in vocab.items()}
    if tensor.dim() > 1:
        # Handle batch dimension
        sentences = []
        for i in range(tensor.shape[0]):
            words = [idx_to_word.get(idx.item(), '<unk>') for idx in tensor[i]]
            # Remove special tokens and padding
            words = [w for w in words if w not in ['<pad>', '<bos>', '<eos>']]
            sentences.append(' '.join(words))
        return sentences
    else:
        words = [idx_to_word.get(idx.item(), '<unk>') for idx in tensor]
        words = [w for w in words if w not in ['<pad>', '<bos>', '<eos>']]
        return ' '.join(words)

# Global variables to store vocabularies
DE_VOCAB = None
EN_VOCAB = None

def set_global_vocabs(train_dataloader):
    """Set global vocabularies for easy access"""
    global DE_VOCAB, EN_VOCAB
    DE_VOCAB = train_dataloader.de_vocab
    EN_VOCAB = train_dataloader.en_vocab

def index_to_german_global(tensor):
    """Convert German tensor to text using global vocab"""
    return index_to_german(tensor, DE_VOCAB)

def index_to_eng_global(tensor):
    """Convert English tensor to text using global vocab"""
    return index_to_eng(tensor, EN_VOCAB)

In [11]:
train_dataloader, _ = get_translation_dataloaders_hf(batch_size=1)
set_global_vocabs(train_dataloader)
data_itr = iter(train_dataloader)


for n in range(10):
    german, english = next(data_itr)
    print("sample {}".format(n))
    print("german input")
    print(index_to_german_global(german))
    print("english target")
    print(index_to_eng_global(english))
    print("_________\n")

Building vocabularies...
English vocab size: 7964
German vocab size: 9762
sample 0
german input
['klettert Rock <unk> Base Leiter ein Kampfsport. Haus']
english target
['giant goalie uniform pots is wooden gentleman boxes.']
_________

sample 1
german input
['Mädchen, liegt die schweißt. Tisches nicht, sitzt Autos, abzufangen,']
english target
['wearing hands hard notice pharmacy ladder do tulips tarp.']
_________

sample 2
german input
['klettert blickt blauen die ein sitzt beiden nicht, ein Hüte Motor schaukeln Kleinkind steigen. Ampel ihres']
english target
['giant am into hard frog an ladder painted blue <unk> <unk> color with mates']
_________

sample 3
german input
['klettert Reihe Jungen roten benutzt sind Sportwagen Leiter am sie']
english target
['purse row reading. <unk> with dancing bushes. large haircut. sand,']
_________

sample 4
german input
['sehen Motorhaube Bluse. ein senkrecht die in Sonnenschein stellt nicht, Hemd zum Bluse. Mehrere am <unk> 5']
english target
['in 

In [12]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DEVICE

device(type='cpu')

In [13]:
def generate_square_subsequent_mask(sz,device=DEVICE):
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

In [14]:
def create_mask(src, tgt,device=DEVICE):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

### Positional encoding
The transformer model doesn't have built-in knowledge of the order of tokens in the sequence. To give the model this information, positional encodings are added to the tokens embeddings. These encodings have a fixed pattern based on their position in the sequence.


In [15]:
# Add positional information to the input tokens
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

### Token embedding
Token embedding, also known as word embedding or word representation, is a way to convert words or tokens from a text corpus into numerical vectors in a continuous vector space. Each unique word or token in the corpus is assigned a fixed-length vector where the numerical values represent various linguistic properties of the word, such as its meaning, context, or relationships with other words.

The `TokenEmbedding` class below converts numerical tokens into embeddings:


In [16]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

In [17]:
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()

        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        outs =outs.to(DEVICE)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

## Inference


The diagram below illustrates the sequence prediction or inference process.
<img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-AI0201EN-Coursera/predict_transformers.png" alt="transformer">
The decoder's output is then mapped onto a vocabulary-sized vector using a linear layer. Following this, a softmax function converts these vector scores into probabilities. The highest probability, as determined by the argmax function, provides the index of your predicted word within the translated sequence. This predicted index is fed back into the decoder in conjunction with the initial sequence, setting the stage to determine the subsequent word in the translation. This autoregressive process is demonstrated by the arrow pointing to form the top of the decoder, in green, to the bottom.


In [18]:
# Add this after the get_translation_dataloaders_hf function
vocab_transform = {}

def create_vocab_transform(train_dataloader):
    """Create vocab_transform dictionary for compatibility"""
    global vocab_transform
    vocab_transform = {
        'de': train_dataloader.de_vocab,
        'en': train_dataloader.en_vocab
    }
    return vocab_transform

In [19]:
train_dataloader, _ = get_translation_dataloaders_hf(batch_size=1)
set_global_vocabs(train_dataloader)

# Create vocab_transform for compatibility with existing code
vocab_transform = create_vocab_transform(train_dataloader)

# Now your existing code will work
SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512

print(f"Source (German) vocab size: {SRC_VOCAB_SIZE}")
print(f"Target (English) vocab size: {TGT_VOCAB_SIZE}")

Building vocabularies...
English vocab size: 7964
German vocab size: 9762
Source (German) vocab size: 9762
Target (English) vocab size: 7964


In [20]:
torch.manual_seed(0)

SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

Let's will start off with a trained model.For this, load the weights of the transformer model from the file 'transformer.pt'.





In [21]:
transformer.load_state_dict(torch.load('transformer.pt', map_location=DEVICE, ))

RuntimeError: Error(s) in loading state_dict for Seq2SeqTransformer:
	size mismatch for src_tok_emb.embedding.weight: copying a param with shape torch.Size([19214, 512]) from checkpoint, the shape in current model is torch.Size([9762, 512]).
	size mismatch for tgt_tok_emb.embedding.weight: copying a param with shape torch.Size([10837, 512]) from checkpoint, the shape in current model is torch.Size([7964, 512]).
	size mismatch for generator.weight: copying a param with shape torch.Size([10837, 512]) from checkpoint, the shape in current model is torch.Size([7964, 512]).
	size mismatch for generator.bias: copying a param with shape torch.Size([10837]) from checkpoint, the shape in current model is torch.Size([7964]).

In [None]:
#print("engish target",index_to_eng(tgt))
#print("german input",index_to_german(src))

In [23]:
# First, let's check the vocabulary sizes from the checkpoint
checkpoint = torch.load('transformer.pt', map_location=DEVICE)
src_vocab_size = checkpoint['src_tok_emb.embedding.weight'].shape[0]
tgt_vocab_size = checkpoint['tgt_tok_emb.embedding.weight'].shape[0]

print(f"Checkpoint expects - German vocab: {src_vocab_size}, English vocab: {tgt_vocab_size}")

# Create model with the checkpoint's expected vocabulary sizes
SRC_VOCAB_SIZE = src_vocab_size  # 19214
TGT_VOCAB_SIZE = tgt_vocab_size  # 10837
EMB_SIZE = 512

# Create the model architecture (you'll need your existing model definition)
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS,
                               EMB_SIZE, NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

# Now load the weights
transformer.load_state_dict(checkpoint)
transformer.eval()

print("Model loaded successfully!")

Checkpoint expects - German vocab: 19214, English vocab: 10837
Model loaded successfully!


In [24]:
import pdfplumber
def extract_text_pdfplumber(pdf_path):
    text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text += page.extract_text()
    return text

In [25]:
def preprocess_for_translation(text):
    # Split into sentences
    sentences = text.split('.')
    # Clean each sentence
    cleaned_sentences = []
    for sentence in sentences:
        sentence = sentence.strip()
        if sentence:  # Skip empty sentences
            cleaned_sentences.append(sentence)
    return cleaned_sentences

In [26]:
def translate_text(text, transformer, src_vocab, tgt_vocab, device):
    # Tokenize text using your vocabulary
    tokens = ['<bos>'] + text.split() + ['<eos>']
    src_indices = [src_vocab.get(token, src_vocab['<unk>']) for token in tokens]
    src_tensor = torch.tensor(src_indices).unsqueeze(0).to(device)  # Add batch dimension

    # Generate translation using your transformer
    with torch.no_grad():
        # You'd need to implement the actual inference logic here
        # This depends on how your transformer's forward method works
        output = transformer.generate(src_tensor)  # This method would need to be implemented

    # Convert output indices back to words
    tgt_vocab_reverse = {v: k for k, v in tgt_vocab.items()}
    translated_words = [tgt_vocab_reverse.get(idx.item(), '<unk>') for idx in output.squeeze()]

    return ' '.join(translated_words)

In [27]:
def translate_pdf(pdf_path, transformer, de_vocab, en_vocab, device):
    # Extract text
    text = extract_text_pdfplumber(pdf_path)

    # Split into manageable chunks
    sentences = preprocess_for_translation(text)

    # Translate each sentence
    translated_sentences = []
    for sentence in sentences:
        if sentence.strip():
            try:
                translation = translate_text(sentence, transformer, de_vocab, en_vocab, device)
                translated_sentences.append(translation)
            except Exception as e:
                print(f"Error translating: {sentence[:50]}... Error: {e}")
                translated_sentences.append(f"[TRANSLATION ERROR: {sentence}]")

    return '\n'.join(translated_sentences)

In [28]:
# Check what methods your transformer has
print(dir(transformer))

# Or look for methods containing 'translate', 'generate', 'decode':
methods = [method for method in dir(transformer) if any(word in method.lower() for word in ['translate', 'generate', 'decode', 'forward'])]
print("Relevant methods:", methods)

['T_destination', '__annotations__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_apply', '_backward_hooks', '_backward_pre_hooks', '_buffers', '_call_impl', '_compiled_call_impl', '_forward_hooks', '_forward_hooks_always_called', '_forward_hooks_with_kwargs', '_forward_pre_hooks', '_forward_pre_hooks_with_kwargs', '_get_backward_hooks', '_get_backward_pre_hooks', '_get_name', '_is_full_backward_hook', '_load_from_state_dict', '_load_state_dict_post_hooks', '_load_state_dict_pre_hooks', '_maybe_warn_non_full_backward_hook', '_modules', '_named_members', '_non_persistent_buffers_set', '_parameters', '_register_load_state_dict_pre_hoo

In [41]:
def translate_sentence_simple(sentence, transformer, de_vocab, en_vocab, device):
    try:
        # Convert to tokens
        src_tokens = ['<bos>'] + sentence.split()[:10] + ['<eos>']
        src_indices = [de_vocab.get(token, de_vocab['<unk>']) for token in src_tokens]

        # Create tensors with sequence first [seq_len, batch_size]
        src_tensor = torch.tensor(src_indices).unsqueeze(1).to(device)  # [seq_len, 1]
        tgt_tensor = torch.tensor([en_vocab['<bos>']]).unsqueeze(1).to(device)  # [1, 1]

        src_len = src_tensor.size(0)
        tgt_len = tgt_tensor.size(0)

        # Create masks
        src_mask = torch.zeros((src_len, src_len), device=device)
        tgt_mask = torch.zeros((tgt_len, tgt_len), device=device)
        src_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)
        tgt_padding_mask = torch.zeros((1, tgt_len), dtype=torch.bool, device=device)
        memory_key_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)

        # Forward pass
        with torch.no_grad():
            output = transformer(
                src_tensor, tgt_tensor,
                src_mask, tgt_mask,
                src_padding_mask, tgt_padding_mask,
                memory_key_padding_mask
            )

        # Get the most likely next token
        probs = torch.softmax(output[-1, 0], dim=-1)  # Last position, first batch
        next_token = torch.argmax(probs).item()

        # Convert back to word
        en_vocab_rev = {v: k for k, v in en_vocab.items()}
        word = en_vocab_rev.get(next_token, '<unk>')

        return f"Predicted next word: {word}"

    except Exception as e:
        return f"Error: {str(e)[:150]}"

In [42]:


# Replace in your PDF function:
def translate_pdf_simple(pdf_path, transformer, de_vocab, en_vocab, device):
    import pdfplumber

    with pdfplumber.open(pdf_path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text() + " "

    sentences = text.split('.')

    for i, sentence in enumerate(sentences[:3]):
        if sentence.strip():
            print(f"German: {sentence.strip()}")
            translation = translate_sentence_simple(sentence.strip(), transformer, de_vocab, en_vocab, device)
            print(f"English: {translation}")
            print()

# Use it:
# Create vocabularies with the exact sizes the model expects
def create_dummy_vocab(size, prefix="word"):
    vocab = {'<unk>': 0, '<pad>': 1, '<bos>': 2, '<eos>': 3}
    for i in range(4, size):
        vocab[f"{prefix}_{i}"] = i
    return vocab

# Create the vocabularies
de_vocab = create_dummy_vocab(19214, "de")  # German vocab
en_vocab = create_dummy_vocab(10837, "en")  # English vocab

# Now you can use them
translate_pdf_simple('input_de.pdf', transformer, de_vocab, en_vocab, DEVICE)

German: Der frühe Morgen bricht an und die ersten Sonnenstrahlen kitzeln san8 mein Gesicht
English: Predicted next word: en_6

German: Ich atme
=ef ein und spüre die frische Morgenlu8 in meinen Lungen
English: Predicted next word: en_6

German: Mit einem Lächeln auf den Lippen
stehe ich auf und beginne den Tag mit voller Energie
English: Predicted next word: en_6



In [44]:
def greedy_decode(transformer, src_tensor, src_mask, en_vocab, device, max_len=20):
    """Greedy decoding - always pick the most likely next word"""
    generated_tokens = [en_vocab['<bos>']]

    for i in range(max_len):
        tgt_tensor = torch.tensor(generated_tokens).unsqueeze(1).to(device)
        tgt_len = tgt_tensor.size(0)
        src_len = src_tensor.size(0)

        # Create proper masks
        src_mask = torch.zeros((src_len, src_len), device=device)
        tgt_mask = torch.zeros((tgt_len, tgt_len), device=device)  # Fixed: use tgt_len
        src_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)
        tgt_padding_mask = torch.zeros((1, tgt_len), dtype=torch.bool, device=device)
        memory_key_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)

        with torch.no_grad():
            output = transformer(src_tensor, tgt_tensor, src_mask, tgt_mask,
                               src_padding_mask, tgt_padding_mask, memory_key_padding_mask)

        # Greedy: pick the most likely token
        probs = torch.softmax(output[-1, 0], dim=-1)
        next_token = torch.argmax(probs).item()

        if next_token == en_vocab['<eos>']:
            break

        generated_tokens.append(next_token)

    return generated_tokens[1:]  # Remove <bos>

def beam_search(transformer, src_tensor, src_mask, en_vocab, device, beam_size=3, max_len=20):
    """Beam search - keep track of top K sequences"""
    beams = [([en_vocab['<bos>']], 0.0)]

    for i in range(max_len):
        candidates = []

        for sequence, score in beams:
            if sequence[-1] == en_vocab['<eos>']:
                candidates.append((sequence, score))
                continue

            tgt_tensor = torch.tensor(sequence).unsqueeze(1).to(device)
            tgt_len = tgt_tensor.size(0)
            src_len = src_tensor.size(0)

            # Create proper masks
            src_mask_local = torch.zeros((src_len, src_len), device=device)
            tgt_mask = torch.zeros((tgt_len, tgt_len), device=device)  # Fixed
            src_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)
            tgt_padding_mask = torch.zeros((1, tgt_len), dtype=torch.bool, device=device)
            memory_key_padding_mask = torch.zeros((1, src_len), dtype=torch.bool, device=device)

            with torch.no_grad():
                output = transformer(src_tensor, tgt_tensor, src_mask_local, tgt_mask,
                                   src_padding_mask, tgt_padding_mask, memory_key_padding_mask)

            # Get top beam_size tokens
            log_probs = torch.log_softmax(output[-1, 0], dim=-1)
            top_probs, top_indices = torch.topk(log_probs, beam_size)

            for prob, idx in zip(top_probs, top_indices):
                new_sequence = sequence + [idx.item()]
                new_score = score + prob.item()
                candidates.append((new_sequence, new_score))

        # Keep only top beam_size sequences
        beams = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_size]

        # Check if all beams ended
        if all(seq[-1] == en_vocab['<eos>'] for seq, _ in beams):
            break

    best_sequence, best_score = beams[0]
    return best_sequence[1:]  # Remove <bos>

def compare_decoding_methods(sentence, transformer, de_vocab, en_vocab, device):
    """Compare greedy vs beam search for a sentence"""
    try:
        # Prepare source
        src_tokens = ['<bos>'] + sentence.split()[:8] + ['<eos>']
        src_indices = [de_vocab.get(token, de_vocab['<unk>']) for token in src_tokens]
        src_tensor = torch.tensor(src_indices).unsqueeze(1).to(device)
        src_mask = None  # Let the functions create their own masks

        print(f"German: {sentence}")

        # Greedy decoding
        greedy_tokens = greedy_decode(transformer, src_tensor, src_mask, en_vocab, device)
        en_vocab_rev = {v: k for k, v in en_vocab.items()}
        greedy_words = [en_vocab_rev.get(token, f'token_{token}') for token in greedy_tokens]
        print(f"Greedy:     {' '.join(greedy_words)}")

        # Beam search
        beam_tokens = beam_search(transformer, src_tensor, src_mask, en_vocab, device, beam_size=3)
        beam_words = [en_vocab_rev.get(token, f'token_{token}') for token in beam_tokens]
        print(f"Beam(k=3):  {' '.join(beam_words)}")
        print("-" * 50)

    except Exception as e:
        print(f"Error: {str(e)[:100]}")

In [45]:
# Usage:
sentences = [
    "Der frühe Morgen bricht an",
    "Ich gehe zur Schule",
    "Das Wetter ist schön"
]

for sentence in sentences:
    compare_decoding_methods(sentence, transformer, de_vocab, en_vocab, DEVICE)

German: Der frühe Morgen bricht an
Greedy:     en_6 en_193 en_966 en_10 en_199 en_26 en_1365 en_5
Beam(k=3):  en_6 en_193 en_966 en_14 en_26 en_1424 <eos>
--------------------------------------------------
German: Ich gehe zur Schule
Greedy:     en_6 en_193 en_966 en_10 en_199 en_26 en_1365 en_5
Beam(k=3):  en_6 en_193 en_966 en_14 en_26 en_1424 <eos>
--------------------------------------------------
German: Das Wetter ist schön
Greedy:     en_6 en_193 en_966 en_10 en_199 en_26 en_1365 en_5
Beam(k=3):  en_6 en_193 en_966 en_14 en_26 en_1424 <eos>
--------------------------------------------------
