In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

In [5]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model,num_heads):
    super(MultiHeadAttention, self).__init__()
    assert d_model % num_heads == 0

    self.d_model = d_model
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    self.W_q = nn.Linear(d_model, d_model)
    self.W_k = nn.Linear(d_model, d_model)
    self.W_v = nn.Linear(d_model, d_model)
    self.W_o = nn.Linear(d_model, d_model)

  def scaled_dot_product_attention(self, Q, K, V, mask = None):
    attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
    
    if mask is not None:
      attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
    
    attn_probs = torch.softmax(attn_scores, dim = -1)

    output = torch.matmul(attn_probs, V)
    return output
  
  def split_heads(self, x):
    batch_size, seq_length, embed_dim = x.size()
    x = x.view(batch_size, seq_length, self.num_heads, self.d_k)
    return x.transpose(1, 2)


  def combine_heads(self, x):
    batch_size, _, seq_length, d_k = x.size()
    return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
  def forward(self, Q,K,V, mask = None):
    Q = self.split_heads(self.W_q(Q))
    K = self.split_heads(self.W_q(K))
    V = self.split_heads(self.W_q(V))


    attn_output = self.scaled_dot_product_attention(Q,K,V,mask)
    
    output = self.W_o(self.combine_heads(attn_output))
    return output

class PositionWiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff):
    super(PositionWiseFeedForward,self).__init__()
    self.fc1 = nn.Linear(d_model, d_ff)
    self.fc2 = nn.Linear(d_ff,d_model)
    self.relu = nn.ReLU()
  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
  def __init__(self,d_model,max_seq_length):
    super(PositionalEncoding,self).__init__()
    pe = torch.zeros(max_seq_length, d_model)
    position = torch.arange(0,max_seq_length,dtype = torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0,d_model,2).float()* -(math.log(10000.0)/d_model))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    self.register_buffer('pe', pe.unsqueeze(0))

  def forward(self, x):
    return x + self.pe[:, :x.size(1)]

class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, d_ff, dropout):
    super(EncoderLayer, self).__init__()
    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.feed_forward = PositionWiseFeedForward(d_model,d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)
  def forward(self, x, mask):
    attn_output = self.self_attn(x, x, x, mask)
    x = self.norm1(x +self.dropout(attn_output))
    ff_output = self.feed_forward(x)
    x = self.norm2(x +self.dropout(ff_output))
    return x

class DecoderLayer(nn.Module):
  def __init__(self,d_model, num_heads, d_ff, dropout):
    super(DecoderLayer, self).__init__()
    self.self_attn = MultiHeadAttention(d_model,num_heads)
    self.cross_attn = MultiHeadAttention(d_model,num_heads)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)
  
  def forward(self, x, enc_output, src_mask, tgt_mask):
    attn_output = self.self_attn(x,x,x, tgt_mask)
    x = self.norm1(x + self.dropout(attn_output))
    attn_output = self.cross_attn(x,enc_output,enc_output,src_mask)
    x = self.norm2(x + self.dropout(attn_output))
    ff_output = self.feed_forward(x)
    x = self.norm3(x + self.dropout(ff_output))
    return x

class Transformer(nn.Module):
  def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads,num_layers,d_ff,max_seq_length,dropout):
    super(Transformer,self).__init__()
    self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
    self.decoder_embedding = nn.Embedding(tgt_vocab_size,d_model)
    self.positional_encoding = PositionalEncoding(d_model,max_seq_length)
    self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
    self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
    self.fc = nn.Linear(d_model,tgt_vocab_size)
    self.dropout = nn.Dropout(dropout)
  
  def generate_mask(self, src, tgt):
    device = src.device 

    src_mask = (src != 0).unsqueeze(1).unsqueeze(2).to(device)  # ensure device
    tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3).to(device)

    seq_length = tgt.size(1)

    nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length, device=device), diagonal=1)).bool()

    tgt_mask = tgt_mask & nopeak_mask

    return src_mask, tgt_mask


  def forward(self,src,tgt):
    src_mask,tgt_mask = self.generate_mask(src,tgt)
    src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
    tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
    enc_output = src_embedded
    for enc_layer in self.encoder_layers:
      enc_output = enc_layer(enc_output,src_mask)
    
    dec_output = tgt_embedded
    for dec_layer in self.decoder_layers:
      dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

    output = self.fc(dec_output)
    
    return output

In [6]:
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1



In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import spacy
from torch.nn.utils.rnn import pad_sequence
import numpy as np
import pandas as pd
from collections import Counter

# Load SpaCy tokenizers
spacy_en = spacy.load("en_core_web_sm")
spacy_de = spacy.load("de_core_news_sm")

def tokenize_en(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [tok.text.lower() for tok in spacy_de.tokenizer(text)]

# Load Multi30k train data
def load_multi30k_from_folder(path):
    with open(f"{path}/en.txt", encoding="utf-8") as f_en, open(f"{path}/de.txt", encoding="utf-8") as f_de:
        en_sentences = f_en.read().strip().split('\n')
        de_sentences = f_de.read().strip().split('\n')
    return list(zip(en_sentences, de_sentences))

def load_tatoeba_parallel(path):
    # Assuming tab-separated file (or whitespace-separated)
    df = pd.read_csv(path, sep='\t', header=None, usecols=[0,1], names=['en', 'de'], encoding='utf-8', nrows = 50000)
    
    # Convert to list of tuples (en, de)
    pairs = list(zip(df['en'].tolist(), df['de'].tolist()))
    return pairs

data = load_tatoeba_parallel("archive/deu.txt")

# data = load_multi30k_from_folder("data")  # Change path as needed
print(f"Loaded {len(data)} sentence pairs")

# Special tokens
SPECIAL_TOKENS = ["<pad>", "<unk>", "<sos>", "<eos>"]

# Build vocabularies manually
def build_vocab_manual(data, index, tokenizer, specials=SPECIAL_TOKENS, min_freq=1):
    counter = Counter()
    for pair in data:
        tokens = tokenizer(pair[index])
        counter.update(tokens)
    itos = list(specials)
    itos += [tok for tok, freq in counter.items() if freq >= min_freq and tok not in specials]
    stoi = {tok: i for i, tok in enumerate(itos)}
    return stoi, itos

src_stoi, src_itos = build_vocab_manual(data, 0, tokenize_en)
tgt_stoi, tgt_itos = build_vocab_manual(data, 1, tokenize_de)

PAD_IDX = src_stoi["<pad>"]
UNK_IDX = src_stoi["<unk>"]
SOS_IDX = src_stoi["<sos>"]
EOS_IDX = src_stoi["<eos>"]

# Dataset class using manual stoi dicts
class TranslationDataset(Dataset):
    def __init__(self, data, src_stoi, tgt_stoi, src_tokenizer, tgt_tokenizer):
        self.data = data
        self.src_stoi = src_stoi
        self.tgt_stoi = tgt_stoi
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src_sentence, tgt_sentence = self.data[idx]
        src_tokens = ["<sos>"] + self.src_tokenizer(src_sentence) + ["<eos>"]
        tgt_tokens = ["<sos>"] + self.tgt_tokenizer(tgt_sentence) + ["<eos>"]

        src_ids = torch.tensor([self.src_stoi.get(tok, UNK_IDX) for tok in src_tokens], dtype=torch.long)
        tgt_ids = torch.tensor([self.tgt_stoi.get(tok, UNK_IDX) for tok in tgt_tokens], dtype=torch.long)

        return src_ids, tgt_ids

# Collate function for padding batches
def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX, batch_first=True)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX, batch_first=True)
    return src_batch, tgt_batch

# Prepare Dataset and DataLoader
train_dataset = TranslationDataset(data, src_stoi, tgt_stoi, tokenize_en, tokenize_de)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)


# Example model setup (replace with your actual Transformer)

SRC_VOCAB_SIZE = len(src_stoi)
TGT_VOCAB_SIZE = len(tgt_stoi)
MAX_LEN = 100

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

model = Transformer(
    SRC_VOCAB_SIZE,
    TGT_VOCAB_SIZE,
    d_model=128,
    num_heads=2,
    num_layers=1,
    d_ff=256,
    max_seq_length=MAX_LEN,
    dropout=0.1
).to(device)

optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# Use the manual vocab dict for padding index here:
criterion = nn.CrossEntropyLoss(ignore_index=src_stoi["<pad>"])

# Training loop
for epoch in range(500):
    model.train()
    total_loss = 0
    for src, tgt in train_loader:
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]
        preds = model(src, tgt_input)
        preds = preds.reshape(-1, preds.shape[-1])
        tgt_output = tgt_output.reshape(-1)
        loss = criterion(preds, tgt_output)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1} Loss: {total_loss / len(train_loader):.4f}")



Loaded 1000 sentence pairs
cuda
Epoch 1 Loss: 7.7248
Epoch 2 Loss: 7.3152
Epoch 3 Loss: 6.9438
Epoch 4 Loss: 6.6477
Epoch 5 Loss: 6.4008
Epoch 6 Loss: 6.1973
Epoch 7 Loss: 6.0205
Epoch 8 Loss: 5.8626
Epoch 9 Loss: 5.7197
Epoch 10 Loss: 5.6018
Epoch 11 Loss: 5.5011
Epoch 12 Loss: 5.4093
Epoch 13 Loss: 5.3368
Epoch 14 Loss: 5.2678
Epoch 15 Loss: 5.1963
Epoch 16 Loss: 5.1406
Epoch 17 Loss: 5.0858
Epoch 18 Loss: 5.0334
Epoch 19 Loss: 4.9875
Epoch 20 Loss: 4.9402
Epoch 21 Loss: 4.8886
Epoch 22 Loss: 4.8480
Epoch 23 Loss: 4.8066
Epoch 24 Loss: 4.7707
Epoch 25 Loss: 4.7185
Epoch 26 Loss: 4.6901
Epoch 27 Loss: 4.6478
Epoch 28 Loss: 4.6146
Epoch 29 Loss: 4.5757
Epoch 30 Loss: 4.5393
Epoch 31 Loss: 4.5127
Epoch 32 Loss: 4.4848
Epoch 33 Loss: 4.4409
Epoch 34 Loss: 4.4135
Epoch 35 Loss: 4.3868
Epoch 36 Loss: 4.3427
Epoch 37 Loss: 4.3185
Epoch 38 Loss: 4.2958
Epoch 39 Loss: 4.2631
Epoch 40 Loss: 4.2321
Epoch 41 Loss: 4.2038
Epoch 42 Loss: 4.1820
Epoch 43 Loss: 4.1555
Epoch 44 Loss: 4.1351
Epoch 45 

In [26]:
def evaluate(model, src_sentence, src_stoi, tgt_stoi, tgt_itos, max_len=50, device='cuda'):
    model.eval()
    
    # Tokenize and convert source sentence using your tokenizer, here assumed pre-tokenized list of ints
    src_tensor = torch.tensor(src_sentence, dtype=torch.long).unsqueeze(0).to(device)  # (1, src_len)
    
    sos_token_id = tgt_stoi["<sos>"]
    eos_token_id = tgt_stoi["<eos>"]
    
    tgt_tensor = torch.tensor([[sos_token_id]], dtype=torch.long).to(device)  # (1,1)
    
    for _ in range(max_len):
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)  # (1, tgt_len, vocab_size)
        
        next_token_logits = output[0, -1, :]  # logits for last token
        next_token_id = torch.argmax(next_token_logits).item()
        
        tgt_tensor = torch.cat([tgt_tensor, torch.tensor([[next_token_id]], device=device)], dim=1)
        
        if next_token_id == eos_token_id:
            break
    
    predicted_ids = tgt_tensor[0].tolist()
    predicted_tokens = [tgt_itos[i] if i < len(tgt_itos) else "<unk>" for i in predicted_ids]
    
    return predicted_tokens

# Use your actual tokenizer here instead of split()
src_text = "A basketball player is taking a shot."
src_tokens = tokenize_en(src_text)  # <-- your actual tokenizer function
src_ids = [src_stoi.get(tok, src_stoi["<unk>"]) for tok in src_tokens]

result = evaluate(model, src_ids, src_stoi, tgt_stoi, tgt_itos, device=device)
print("Generated:", " ".join(result))


Generated: <sos> ein basketballspieler wirft auf den korb . <eos>
