In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [35]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 500):
        super().__init__()

        #PE(pos, 2i) = sin(pos / 1000^(2i/d_model))
        #PE(pos, 2i+1) = cos(pos / 1000^(2i/d_model))
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position*div_term)
        pe[:, 1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

In [61]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, num_heads: int):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(0.1)
    
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = F.softmax(attn_scores, dim=-1)
        attn_probs = self.dropout(attn_probs)
        return torch.matmul(attn_probs, V)
    
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        Q = self.w_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.w_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.w_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        
        return self.w_o(attn_output)

In [62]:
class FeedForward(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout=0.1): 
        #d_ff: hidden layer dimension 
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff) #input layer
        self.linear2 = nn.Linear(d_ff, d_model) #hidden layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(F.relu(self.linear1(x))))

In [63]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int , d_ff: int, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.ff(x)
        return self.norm2(x + self.dropout(ff_output))
    

In [64]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout=0.1):
        super().__init__()
        self.masked_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        attn_output = self.masked_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.enc_dec_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.ff(x)
        return self.norm3(x + self.dropout(ff_output))

In [65]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size: int, tgt_vocab_size: int, d_model: int = 512, num_heads: int = 8,
                num_layers: int = 6, d_ff: int = 2048, max_len: int = 5000, dropout: float = 0.1):
        super().__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.final_linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def create_padding_mask(self, seq, pad_token=0):
        return (seq != pad_token).unsqueeze(1).unsqueeze(2)

    def create_look_ahead_mask(self, seq_len):
        mask = (torch.triu(torch.ones(seq_len, seq_len), diagonal=1)).bool()
        return ~mask

    def forward(self, src, tgt, src_pad_mask=None, tgt_pad_mask=None):
        if src_pad_mask is None:
            src_pad_mask = self.create_padding_mask(src)
        if tgt_pad_mask is None:
            tgt_pad_mask = self.create_padding_mask(tgt)

        tgt_look_ahead_mask = self.create_look_ahead_mask(tgt.size(1)).to(tgt.device)
        tgt_mask = tgt_pad_mask & tgt_look_ahead_mask

        src_emb = self.dropout(self.pos_encoding(self.src_embedding(src) * math.sqrt(self.src_embedding.embedding_dim)))
        tgt_emb = self.dropout(self.pos_encoding(self.tgt_embedding(tgt) * math.sqrt(self.tgt_embedding.embedding_dim)))

        enc_output = src_emb
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_pad_mask)

        dec_output = tgt_emb
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_pad_mask, tgt_mask)

        return self.final_linear(dec_output)

In [66]:
src_texts = [
    "The Transformer is a powerful deep learning architecture.",
    "It was introduced in the paper Attention is All You Need."
]

tgt_texts = [
    "Transformer là một kiến trúc học sâu mạnh mẽ.",
    "Nó được giới thiệu trong bài báo Attention is All You Need."
]

In [68]:
from torch.utils.data import Dataset, DataLoader
import torch

BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"

def build_vocab(texts, min_freq=1):
    vocab = {PAD_TOKEN: 0, BOS_TOKEN: 1, EOS_TOKEN: 2, UNK_TOKEN: 3}
    idx = 4
    for text in texts:
        for word in text.lower().split():
            if word not in vocab:
                vocab[word] = idx
                idx += 1
    return vocab

src_vocab = build_vocab(src_texts)
tgt_vocab = build_vocab(tgt_texts)

src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)

def text_to_indices(text, vocab, is_src=True):
    words = text.lower().split() if is_src else text.split()
    return [vocab.get(BOS_TOKEN, 1)] + [vocab.get(w, vocab[UNK_TOKEN]) for w in words] + [vocab.get(EOS_TOKEN, 2)]

class TranslationDataset(Dataset):
    def __init__(self, src_texts, tgt_texts):
        self.src = [torch.tensor(text_to_indices(s, src_vocab)) for s in src_texts]
        self.tgt = [torch.tensor(text_to_indices(t, tgt_vocab, is_src=False)) for t in tgt_texts]
    
    def __len__(self):
        return len(self.src)
    
    def __getitem__(self, idx):
        return self.src[idx], self.tgt[idx]

dataset = TranslationDataset(src_texts, tgt_texts)

def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_padded = torch.nn.utils.rnn.pad_sequence(src_batch, padding_value=src_vocab[PAD_TOKEN], batch_first=True)
    tgt_padded = torch.nn.utils.rnn.pad_sequence(tgt_batch, padding_value=tgt_vocab[PAD_TOKEN], batch_first=True)

    tgt_input = tgt_padded[:, :-1]
    tgt_target = tgt_padded[:, 1:]
    return src_padded, tgt_input, tgt_target

dataloader = DataLoader(dataset, batch_size=2, collate_fn=collate_fn)

In [69]:
model = Transformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    d_model=128,
    num_heads=4,
    num_layers=2,
    d_ff=256,
    max_len=100,
    dropout=0.1
)

model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab[PAD_TOKEN])

In [72]:
epochs = 1000  

for epoch in range(epochs):
    total_loss = 0
    for src, tgt_input, tgt_target in dataloader:
        optimizer.zero_grad()
        
        output = model(src, tgt_input) 
        
        loss = criterion(
            output.reshape(-1, tgt_vocab_size),
            tgt_target.reshape(-1)
        )
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss / len(dataloader):.4f}")

Epoch 0, Loss: 3.2813
Epoch 100, Loss: 0.0907
Epoch 200, Loss: 0.0616
Epoch 300, Loss: 0.0962
Epoch 400, Loss: 0.0565
Epoch 500, Loss: 0.0138
Epoch 600, Loss: 0.1300
Epoch 700, Loss: 0.0588
Epoch 800, Loss: 0.0319
Epoch 900, Loss: 0.0705


In [75]:
model.eval()


Transformer(
  (src_embedding): Embedding(21, 128)
  (tgt_embedding): Embedding(25, 128)
  (pos_encoding): PositionalEncoding()
  (encoder_layers): ModuleList(
    (0-1): 2 x EncoderLayer(
      (self_attn): MultiHeadAttention(
        (w_q): Linear(in_features=128, out_features=128, bias=True)
        (w_k): Linear(in_features=128, out_features=128, bias=True)
        (w_v): Linear(in_features=128, out_features=128, bias=True)
        (w_o): Linear(in_features=128, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ff): FeedForward(
        (linear1): Linear(in_features=128, out_features=256, bias=True)
        (linear2): Linear(in_features=256, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (decoder_layers): Mo

In [76]:
def translate(sentence):
    src_indices = torch.tensor([text_to_indices(sentence, src_vocab, is_src=True)]).to(next(model.parameters()).device)
    
    
    tgt_indices = torch.tensor([[tgt_vocab[BOS_TOKEN]]])
    
    for _ in range(50):  
        tgt_input = tgt_indices
        with torch.no_grad():
            output = model(src_indices, tgt_input)
        next_token = output[:, -1, :].argmax(dim=-1)
        tgt_indices = torch.cat([tgt_indices, next_token.unsqueeze(1)], dim=1)
        
        if next_token.item() == tgt_vocab[EOS_TOKEN]:
            break
    
    
    inv_tgt_vocab = {v: k for k, v in tgt_vocab.items()}
    words = [inv_tgt_vocab.get(i.item(), "") for i in tgt_indices[0, 1:]]  
    return " ".join(words).replace(" <eos>", "")

print(translate("The Transformer is a powerful deep learning architecture."))

<unk> là một kiến trúc học sâu mạnh mẽ.
