# Attention is all you need 📜

In [98]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import math
import numpy as np
import re
from collections import Counter

In [99]:
MAX_SEQ_LEN = 20

In [100]:
PATH = 'datasets/traductor-spanish/Sentence pairs in English-Spanish - 2025-04-04.tsv'

eng_sentence = []
spa_sentence = []
eng_spa_pairs = []
with open(PATH, 'r', encoding = 'utf-8') as file:
    for line in file:
        parts = line.strip().split('\t')
        if len(parts) >= 4: 
            eng = parts[1]
            spa = parts[3]
            eng_sentence.append(eng)
            spa_sentence.append(spa)

            eng_spa_pairs.append([eng, spa])


eng_spa_pairs[-5:]

[['Can you respond?', '¿Puedes contestar?'],
 ['You must love your country.', 'Uno debe amar a su patria.'],
 ['This herbal tea is very hot.', 'Esta tisana está muy caliente.'],
 ['Who opened the window?', '¿Quién abrió la ventana?'],
 ['The exam ends in half an hour.', 'El examen termina en media hora.']]

In [101]:

def preprocess_sentences(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r'[" "]+', ' ', sentence)
    sentence = re.sub(r'[^a-z]+', ' ', sentence)
    sentence = re.sub(r"[á]+", "a", sentence)
    sentence = re.sub(r"[é]+", "e", sentence)
    sentence = re.sub(r"[í]+", "i", sentence)
    sentence = re.sub(r"[ó]+", "o", sentence)
    sentence = re.sub(r"[ú]+", "u", sentence)
    sentence = sentence.strip()
    sentence = '<sos> ' + sentence + ' <eos>'
    return sentence

s1 = 'Hola como estas? 123'

preprocess_sentences(s1)

'<sos> hola como estas <eos>'

In [102]:
eng_sentences = [preprocess_sentences(sentence) for sentence in eng_sentence]
spa_sentences = [preprocess_sentences(sentence) for sentence in spa_sentence]


In [103]:
eng_sentences[:5], spa_sentences[:5]

(['<sos> let s try something <eos>',
  '<sos> i have to go to sleep <eos>',
  '<sos> today is june th and it is muiriel s birthday <eos>',
  '<sos> today is june th and it is muiriel s birthday <eos>',
  '<sos> muiriel is now <eos>'],
 ['<sos> intentemos algo <eos>',
  '<sos> tengo que irme a dormir <eos>',
  '<sos> hoy es de junio y es el cumplea os de muiriel <eos>',
  '<sos> hoy es el de junio y es el cumplea os de muiriel <eos>',
  '<sos> ahora muiriel tiene a os <eos>'])

In [104]:
print(eng_sentence[-2:])
print(spa_sentence[-2:])

['Who opened the window?', 'The exam ends in half an hour.']
['¿Quién abrió la ventana?', 'El examen termina en media hora.']


## Vocabulary

In [105]:
def build_vocab(sentences):
    words = [word for sentence in sentences for word in sentence.split()]
    word_count = Counter(words)
    sorted_word_counts = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
    word2idx = {word: idx for idx, (word, _) in enumerate(sorted_word_counts, 2)}
    word2idx['<pad>'] = 0
    word2idx['<unk>'] = 1
    idx2word = {idx: word for word, idx in word2idx.items()}
    return word2idx, idx2word



In [106]:
eng_word2idx, eng_idx2word = build_vocab(eng_sentences)
spa_word2idx, spa_idx2word = build_vocab(spa_sentences)
eng_vocab_size = len(eng_word2idx)
spa_vocab_size = len(spa_word2idx)

In [107]:

print('English Vocabulary Size:', eng_vocab_size)
print('Spanish Vocabulary Size:', spa_vocab_size)

English Vocabulary Size: 27968
Spanish Vocabulary Size: 43578


In [108]:
print(eng_word2idx)



In [109]:
class EngSpaDataset(Dataset):
    def __init__(self, eng_sentences, spa_sentences, eng_word2idx, spa_word2idx):
        self.eng_sentences = eng_sentences
        self.spa_sentences = spa_sentences
        self.eng_word2idx = eng_word2idx
        self.spa_word2idx = spa_word2idx

    def __len__(self):
        assert len(self.eng_sentences) == len(self.spa_sentences), "Mismatch in number of sentences"
        return len(self.eng_sentences)

    def __getitem__(self, idx):
        eng_sentence = self.eng_sentences[idx]
        spa_sentence = self.spa_sentences[idx]
        
        eng_indices = [self.eng_word2idx.get(word, self.eng_word2idx['<unk>']) for word in eng_sentence.split()]
        spa_indices = [self.spa_word2idx.get(word, self.spa_word2idx['<unk>']) for word in spa_sentence.split()]
        
        return torch.tensor(eng_indices), torch.tensor(spa_indices)


In [110]:

def collate_fun(batch):
    eng_batch , spa_batch = zip(*batch)
    eng_batch = [seq[:MAX_SEQ_LEN].clone().detach() for seq in eng_batch]
    spa_batch = [seq[:MAX_SEQ_LEN].clone().detach() for seq in spa_batch]
    eng_batch = torch.nn.utils.rnn.pad_sequence(eng_batch, batch_first=True, padding_value=0)
    spa_batch = torch.nn.utils.rnn.pad_sequence(spa_batch, batch_first=True, padding_value=0)
    return eng_batch, spa_batch




In [111]:
torch.manual_seed(23)

<torch._C.Generator at 0x73ee53fa9310>

In [112]:
for i in range(10):
    print(eng_sentence[i])
    print(spa_sentence[i])
    print()

Let's try something.
¡Intentemos algo!

I have to go to sleep.
Tengo que irme a dormir.

Today is June 18th and it is Muiriel's birthday!
¡Hoy es 18 de junio y es el cumpleaños de Muiriel!

Today is June 18th and it is Muiriel's birthday!
¡Hoy es el 18 de junio y es el cumpleaños de Muiriel!

Muiriel is 20 now.
Ahora, Muiriel tiene 20 años.

Muiriel is 20 now.
Muiriel tiene 20 años ahora.

The password is "Muiriel".
La contraseña es "Muiriel".

I will be back soon.
Volveré pronto.

I will be back soon.
Vuelvo en seguida.

I will be back soon.
Yo regresaré pronto.



## Select device

In [113]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


## 🔢 Default values in the paper

# 🤖 Build Model

In [114]:
class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len):
    super().__init__()
    self.pos_embed_matrix = torch.zeros(max_len, d_model, device= device) # (max_len, d_model)
    token_pos = torch.arange(0, max_len).unsqueeze(1) #(max_len, 1)
    div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)) # (d_model//2,)
    self.pos_embed_matrix[:, 0::2] = torch.sin(token_pos * div_term)
    self.pos_embed_matrix[:, 1::2] = torch.cos(token_pos * div_term)
    self.pos_embed_matrix = self.pos_embed_matrix.unsqueeze(0) # (1, max_len, d_model)

  def forward(self, x):
    return x + self.pos_embed_matrix[:, :x.size(1), :]


# how function the multiplication token_pos * div_term
# [[0],
#  [1],
#  [2]] *
# [ a, b, c ]  =>

# [[0*a, 0*b, 0*c],
#  [1*a, 1*b, 1*c],
#  [2*a, 2*b, 2*c]]

# the shape final is (max_len, d_model//2)

In [115]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, h):
    super().__init__()
    self.d_model = d_model
    self.h = h
    self.d_k = d_model // h
    self.d_v = d_model // h
    self.W_q = nn.Linear(d_model, d_model)
    self.W_k = nn.Linear(d_model, d_model)
    self.W_v = nn.Linear(d_model, d_model)
    self.W_o = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)
    self.scale = math.sqrt(self.d_k)

  def forward(self, Q, K, V, mask):
    batch_size = Q.size(0)
    # (batch_size, seq_len, d_model)
    Q = self.W_q(Q)
    K = self.W_k(K)
    V = self.W_v(V)

    # (batch_size, seq_len, d_model) -> (batch_size, seq_len, h, d_k)
    Q = Q.view(batch_size, -1, self.h, self.d_k)
    K = K.view(batch_size, -1, self.h, self.d_k)
    V = V.view(batch_size, -1, self.h, self.d_k)

    # (batch_size, seq_len, h, d_k) => (batch_size, h, seq_len, d_k)
    Q = Q.transpose(1, 2)
    K = K.transpose(1, 2)
    V = V.transpose(1, 2)

    attended_values, _ = self.scaled_dot_product(Q, K, V, mask)
    attended_values = attended_values.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
    attended_values = self.W_o(attended_values)
    return attended_values




  def scaled_dot_product(self, Q, K, V, mask):
    scores = torch.matmul(Q, K.transpose(-2, -1))/ math.sqrt(self.d_k)
    if mask is not None:
      scores = scores.masked_fill(mask == 0, -1e9)
    attention  = F.softmax(scores, dim=-1)

    attended_values = torch.matmul(attention, V)
    return attended_values, attention



In [116]:
class FeedForward(nn.Module):
  def __init__(self, d_model, d_ff):
    super().__init__()
    self.linear1 = nn.Linear(d_model, d_ff)
    self.linear2 = nn.Linear(d_ff, d_model)
  def forward(self, x):
    x = self.linear1(x)
    x = F.relu(x)
    x = self.linear2(x)
    return x


In [117]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, h, d_ff, dropout):
    super().__init__()
    self.self_attention = MultiHeadAttention(d_model, h)
    self.ff = FeedForward(d_model, d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)
  def forward(self, x, mask):
    attention = self.self_attention(x, x, x, mask)
    x = x + self.dropout1(attention)
    x = self.norm1(x)

    x = x + self.dropout2(self.ff(x))
    x = self.norm2(x)
    return x




In [118]:
class Encoder(nn.Module):
  def __init__(self, N, d_model, h, d_ff, dropout):
    super().__init__()
    self.layers = nn.ModuleList(
        [EncoderLayer(d_model, h, d_ff, dropout) for _ in range(N)])
    self.norm = nn.LayerNorm(d_model)
  def forward(self, x, mask=None):
    for layer in self.layers:
      x = layer(x, mask)
    return self.norm(x)

In [119]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, h, d_ff, dropout):
    super().__init__()
    self.self_attention = MultiHeadAttention(d_model, h)
    self.cross_attention = MultiHeadAttention(d_model, h)
    self.ff = FeedForward(d_model, d_ff)
    # this norm the embeddgings of each token
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)
    self.dropout3 = nn.Dropout(dropout)

  def forward(self, x, encoder_output, source_mask, target_mask):
    attention = self.self_attention(x, x, x, target_mask)
    x = x + self.dropout1(attention)
    x = self.norm1(x)

    encoder_decoder_attn = self.cross_attention(x, encoder_output, encoder_output, source_mask)
    x = x + self.dropout2(encoder_decoder_attn)
    x = self.norm2(x)

    x = self.ff(x)
    x = x + self.dropout3(x)
    x = self.norm3(x)
    return x




In [120]:
class Decoder(nn.Module):
  def __init__(self, N, d_model, h, d_ff, dropout):
    super().__init__()
    self.layers = nn.ModuleList(
        [DecoderLayer(d_model, h, d_ff, dropout) for _ in range(N)]
    )
    self.norm = nn.LayerNorm(d_model)

  def forward(self, x, encoder_output, target_mask, source_mask):
    for layer in self.layers:
      x = layer(x, encoder_output, source_mask, target_mask)
    return self.norm(x)

In [121]:
class Transformer(nn.Module):
  def __init__(self, N, d_model, h, d_ff, dropout, input_vocab_size,
               output_vocab_size, max_len):
    super().__init__()
    self.d_model = d_model
    self.input_embedding = nn.Embedding(input_vocab_size, d_model)
    self.ouput_embedding = nn.Embedding(output_vocab_size, d_model)
    self.positional_encoding = PositionalEncoding(d_model, max_len)
    self.encoder = Encoder(N, d_model, h, d_ff,dropout)
    self.decoder = Decoder(N, d_model, h, d_ff,dropout)
    self.fc_out = nn.Linear(d_model, output_vocab_size)

  def forward(self, source, target):
    # Masks
    source_mask, target_mask = self.create_masks(source, target)
    # Before encoder
    source = self.input_embedding(source) * math.sqrt(self.d_model)
    source = self.positional_encoding(source)

    # Encoder
    encoder_output = self.encoder(source, source_mask)
    # Before decoder
    target = self.ouput_embedding(target) * math.sqrt(self.d_model)
    target = self.positional_encoding(target)
    # Decoder
    output = self.decoder(target, encoder_output, target_mask, source_mask)

    return self.fc_out(output)

  def create_masks(self, source, target):
    source_mask = (source != 0).unsqueeze(1).unsqueeze(2)
    target_mask = (target != 0).unsqueeze(1).unsqueeze(2)

    # Number of words
    size_target = target.size(1)
    # Avoid seeing future words
    no_mask = torch.tril(torch.ones(1, size_target, size_target)).bool().to(device)
    # No attention in to the future and no attention in paddigns
    target_mask = target_mask & no_mask
    return source_mask, target_mask

## Simple text

In [122]:
# Number of Encoders and Decoders in the transformer
N = 6
# Size of embedding
d_model = 512
# Number of heads in the Multi-Head-Attention
h = 8
# Output size in each Linear in the Multi-Head-Attention
d_k = d_model // h
d_v = d_model // h
# Size hidden layer in Feed Forward
d_ff = 2048
# Context size
max_len = 32
# Vocabulary size
#vocab_size = 1000


# Dropout value
dropout = 0.1

In [123]:
seq_len_source = 10
seq_len_target = 20 
input_vocab_size = eng_vocab_size
output_vocab_size = spa_vocab_size


In [124]:
def train(model, dataloader, loss_function, optimizer, epochs):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for i, (eng_batch, spa_batch) in enumerate(dataloader):
            eng_batch = eng_batch.to(device)
            spa_batch = spa_batch.to(device)
            
            # Decoder preprocessing
            target_input = spa_batch[:, :-1]
            target_output = spa_batch[:, 1: ].contiguous().view(-1)

            # Zero grads
            optimizer.zero_grad()
            
            output = model(eng_batch, target_input)
            output = output.view(-1, output.size(-1))

            loss = loss_function(output, target_output)
            # gradient and update parameters
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(dataloader)}')

In [125]:
BATCH_SIZE = 16
dataset = EngSpaDataset(eng_sentences, spa_sentences, eng_word2idx, spa_word2idx)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fun)

loss_function = nn.CrossEntropyLoss(ignore_index=0)
model = Transformer(N, d_model, h, d_ff, dropout, input_vocab_size, output_vocab_size, MAX_SEQ_LEN)
# model = Transformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6,
#                     input_vocab_size=eng_vocab_size, target_vocab_size=spa_vocab_size,
#                     max_len=MAX_SEQ_LEN, dropout=0.1)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
train(model, dataloader, loss_function, optimizer, epochs = 10)