# Transformer - Attention is all you need

## Configuration


### Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from collections import Counter
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import re
import math

### Device

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## Global Vars

In [3]:
MAX_SEQ_LEN = 128
# Semilla
torch.manual_seed(23)

<torch._C.Generator at 0x7f6d91664310>

---
## Class

### PositionalEmbedding

In [4]:
class PositionalEmbedding(nn.Module):
    def __init__ (self, d_model, max_seq_len=MAX_SEQ_LEN):
        super().__init__()
        
        self.pos_embed_matrix = torch.zeros(max_seq_len, d_model, device=device)
        token_pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() 
                             * (-math.log(10000.0) / d_model))
        self.pos_embed_matrix[:, 0::2] = torch.sin(token_pos * div_term)
        self.pos_embed_matrix[:, 1::2] = torch.cos(token_pos * div_term)
        self.pos_embed_matrix = self.pos_embed_matrix.unsqueeze(0).transpose(0, 1)
        
        
    def forward(self, x):
        return x + self.pos_embed_matrix[:x.size(0), :]


### PositionFeedForward

In [5]:
class PositionFeedForward(nn.Module):
    def __init__(self, d_model, dim_feedforward):
        super().__init__()
        
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
    
    
    def forward(self, x):
        return self.linear2(F.gelu(self.linear1(x)))

### MultiHeadAttention

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=512, n_heads=8):
        super().__init__()
        
        assert d_model % n_heads == 0, 'Embedding size not compatible with number of heads'
        
        self.d_v = d_model // n_heads
        self.d_k = self.d_v
        self.n_heads = n_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)    
    
    
    def forward(self, Q, K, V, mask=None):
        '''
        Q, K, V: [batch_size, seq_len, d_model(n_heads*d_k)]
        after transpose Q, K, V: [batch_size, n_heads, seq_len, d_k]
        '''
        batch_size = Q.size(0)
        
        Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        
        weighted_values, attention = self.scaled_dot_product(Q, K, V, mask)
        
        weighted_values = weighted_values.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_k)
        weighted_values = self.W_o(weighted_values)
        
        return weighted_values, attention
        
        
        
    def scaled_dot_product(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = F.softmax(scores, dim=-1)
        weighted_values = torch.matmul(attention, V)
        
        return weighted_values, attention
            
        

### Encoder

#### EncoderSubLayer

In [7]:
class EncoderSubLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = PositionFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.droupout1 = nn.Dropout(dropout)
        self.droupout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask = None):
        attention_score, _ = self.self_attn(x, x, x, mask)
        x = x + self.droupout1(attention_score)
        x = self.norm1(x)
        x = x + self.droupout2(self.ffn(x))
        return self.norm2(x)
    

#### Encoder

In [8]:
class Encoder(nn.Module):
    def __init__(self, d_model, n_head, dim_feedforward, num_layers, dropout=0.1):
         super().__init__()
         
         self.layers = nn.ModuleList([EncoderSubLayer(d_model, n_head, dim_feedforward, dropout) for _ in range(num_layers)])
         self.norm = nn.LayerNorm(d_model)
         
    
    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
            
        return self.norm(x)

### Decoder


#### DecoderSubLayer

In [9]:
class DecoderSubLayer(nn.Module):
    def __init__(self, d_model, n_heads, dim_feedforward, dropout=0.1):
        super().__init__()
        
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.cross_attn = MultiHeadAttention(d_model, n_heads)
        self.feed_forward = PositionFeedForward(d_model, dim_feedforward)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
        
    def forward(self, x, encoder_output, target_mask=None, encoder_mask=None):
        attention_score, _ = self.self_attn(x, x, x, mask=target_mask)
        x = x + self.dropout1(attention_score)
        x = self.norm1(x)
        
        encoder_attention_score, _ = self.cross_attn(x, encoder_output, encoder_output, mask=encoder_mask)
        x = x + self.dropout2(encoder_attention_score)
        x = self.norm2(x)
        
        feed_forward_output = self.feed_forward(x)
        x = x + self.dropout3(feed_forward_output)
        x = self.norm3(x)
        
        return x
        

#### Decoder

In [10]:
class Decoder(nn.Module):
    def __init__ (self, d_model, n_head, dim_feedforward, num_layers, dropout=0.1):
        super().__init__()
        
        self.layers = nn.ModuleList([DecoderSubLayer(d_model, n_head, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        
    
    def forward(self, x, encoder_output, target_mask, encoder_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, target_mask, encoder_mask)
            
        return self.norm(x)

### Transformer

In [11]:
class Transformer(nn.Module):
    def __init__(
                self, 
                d_model, 
                n_head, 
                dim_feedforward, 
                num_layers, 
                input_vocab_size,
                target_vocab_size,
                max_seq_len=MAX_SEQ_LEN,
                dropout=0.1):
        
        super().__init__()
        
        self.encoder_embedding = nn.Embedding(input_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(target_vocab_size, d_model)
        self.pos_embedding = PositionalEmbedding(d_model, max_seq_len)
        self.encoder = Encoder(d_model, n_head, dim_feedforward, num_layers, dropout)
        self.decoder = Decoder(d_model, n_head, dim_feedforward, num_layers, dropout)
        self.output_layer = nn.Linear(d_model, target_vocab_size)
    
    
    def forward(self, source, target):
        # Enconder mask
        encoder_mask, target_mask = self.mask(source, target)
        # Embedding and positional embedding
        source = self.encoder_embedding(source) * math.sqrt(self.encoder_embedding.embedding_dim)
        source = self.pos_embedding(source)
        # Encoder
        encoder_output = self.encoder(source, encoder_mask)
        
        # Decoder embedding and positional encoding
        target = self.decoder_embedding(target) * math.sqrt(self.encoder_embedding.embedding_dim)
        target = self.pos_embedding(target)
        # Decoder
        decoder_output = self.decoder(target, encoder_output, target_mask, encoder_mask)
        
        return self.output_layer(decoder_output)
        
    
    def mask(self, source, target):
        source_mask = (source != 0).unsqueeze(1).unsqueeze(2)
        target_mask = (target != 0).unsqueeze(1).unsqueeze(2)
        size = target.size(1)
        no_mask = torch.tril(torch.ones((1, size, size), device=device)).bool()
        target_mask = target_mask & no_mask
        
        return source_mask, target_mask

---
# Simple test

In [12]:
seq_len_source = 10
seq_len_target = 10
batch_size = 2
input_vocab_size = 50
target_vocab_size = 50

source = torch.randint(1, input_vocab_size, (batch_size, seq_len_source))
target = torch.randint(1, target_vocab_size, (batch_size, seq_len_target))

d_model = 512
n_heads = 8
d_ff = 2048
num_layers = 6

model = Transformer(d_model, 
                n_heads, 
                d_ff, 
                num_layers, 
                input_vocab_size,
                target_vocab_size,
                max_seq_len=MAX_SEQ_LEN,
                dropout=0.1)


In [13]:
model = model.to(device)
source = source.to(device)
target = target.to(device)

In [14]:
output = model(source, target)

In [15]:
# Expected output shape -> [batch, seq_len_target, target_vocab_size] i.e. [2, 10, 50]
print(f'ouput.shape {output.shape}')

ouput.shape torch.Size([2, 10, 50])


---

---
# English to Spanish Translation -  Example

## Reading the data

In [16]:
PATH = "../Transformer/resources/eng-spa.txt"

with open(PATH, 'r', encoding='utf-8')  as f:
    lines = f.readlines()
    
eng_spa_pairs = [line.strip().split('\t') for line in lines if '\t' in line]

eng_spa_pairs[:10]

[['OK.', '¡Órale!'],
 ['No.', 'No.'],
 ['Go!', '¡Ya!'],
 ['No!', '¡No!'],
 ['Go!', '¡Fuera!'],
 ['Ah!', '¡Anda!'],
 ['Ow!', '¡Ay!'],
 ['Go!', '¡Sal!'],
 ['Hi.', '¡Hola!'],
 ['Go.', 'Vete.']]

## Prepare the data

In [17]:
eng_sentences = [pair[0] for pair in eng_spa_pairs]
spa_sentences = [pair[1] for pair in eng_spa_pairs]

print(eng_sentences[:5])
print(spa_sentences[:5])

['OK.', 'No.', 'Go!', 'No!', 'Go!']
['¡Órale!', 'No.', '¡Ya!', '¡No!', '¡Fuera!']


## Functions

In [18]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r'[" "]+', ' ', sentence)
    sentence = re.sub(r'[á]+', 'a', sentence)
    sentence = re.sub(r'[é]+', 'e', sentence)
    sentence = re.sub(r'[í]+', 'i', sentence)
    sentence = re.sub(r'[ó]+', 'o', sentence)
    sentence = re.sub(r'[ú]+', 'u', sentence)
    sentence = re.sub(r'[^a-z]+', ' ', sentence)
    sentence = sentence.strip()
    sentence = '<sos> ' + sentence + ' <eos>'
    return sentence

s1 = "¿Hola cómo estás?1@2"
print(s1)
print(preprocess_sentence(s1))

¿Hola cómo estás?1@2
<sos> hola como estas <eos>


In [19]:
eng_sentences = [preprocess_sentence(sentence) for sentence in eng_sentences]
spa_sentences = [preprocess_sentence(sentence) for sentence in spa_sentences]

print(eng_sentences[:5])
print(spa_sentences[:5])

['<sos> ok <eos>', '<sos> no <eos>', '<sos> go <eos>', '<sos> no <eos>', '<sos> go <eos>']
['<sos> orale <eos>', '<sos> no <eos>', '<sos> ya <eos>', '<sos> no <eos>', '<sos> fuera <eos>']


In [20]:
def build_vocab(sentences):
    words = [word for sentence in sentences for word in sentence.split()]
    word_count = Counter(words)
    
    sorted_word_counts = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
    
    word2idx = {word: idx for idx, (word, _) in enumerate(sorted_word_counts, 2)}
    word2idx['<pad>'] = 0
    word2idx['<unk>'] = 1
    
    idx2word = {idx: word for word, idx in word2idx.items()}
    
    return word2idx, idx2word
    
    

In [21]:
eng_word2idx, eng_idx2word = build_vocab(eng_sentences)
spa_word2idx, spa_idx2word = build_vocab(spa_sentences)

In [22]:
eng_vocab_size = len(eng_word2idx)
spa_vocab_size = len(spa_word2idx)

print(f'Vocabularies sizes: {eng_vocab_size}, {spa_vocab_size}')

Vocabularies sizes: 27933, 47339


## Dataset - Class

In [23]:
class EngSpaDataset(Dataset):
    def __init__(self, eng_sentences, spa_sentences, eng_word2idx, spa_word2idx):
        self.eng_sentences = eng_sentences
        self.spa_sentences = spa_sentences
        self.eng_word2idx = eng_word2idx
        self.spa_word2idx = spa_word2idx
        
    
    def __len__(self):
        return len(self.eng_sentences)
    
    def __getitem__(self, idx):
        eng_sentence = self.eng_sentences[idx]
        spa_sentence = self.spa_sentences[idx]
        
        # return tokens idxs
        eng_idxs = [ self.eng_word2idx.get(word, self.eng_word2idx['<unk>']) for word in eng_sentence.split()]
        spa_idxs = [ self.spa_word2idx.get(word, self.spa_word2idx['<unk>']) for word in spa_sentence.split()]
        
        return torch.tensor(eng_idxs), torch.tensor(spa_idxs)
        

## Collate Function

In [24]:
def collate_fn(batch):
    eng_batch, spa_batch = zip(*batch)
    
    eng_batch =  [seq[:MAX_SEQ_LEN].clone().detach() for seq in eng_batch]
    spa_batch =  [seq[:MAX_SEQ_LEN].clone().detach() for seq in spa_batch]
    
    eng_batch = torch.nn.utils.rnn.pad_sequence(eng_batch, batch_first=True, padding_value=0)
    spa_batch = torch.nn.utils.rnn.pad_sequence(spa_batch, batch_first=True, padding_value=0)
    
    return eng_batch, spa_batch

## Train Function

In [25]:
def train(model, dataloader, loss_function, optimizer, epochs):
    model.train()
    
    for epoch in range(epochs):
        total_loss = 0
        for i, (eng_batch, spa_batch) in enumerate(dataloader):
            eng_batch = eng_batch.to(device)
            spa_batch = spa_batch.to(device)
            
            # Decoder preprocessing
            target_input = spa_batch[:, :-1]                        # <eos>
            target_output = spa_batch[:, 1:].contiguous().view(-1)  # <sos>
            
            # Zero grads
            optimizer.zero_grad()
            
            # run model
            output = model(eng_batch, target_input)
            output = output.view(-1, output.size(-1)) 
            
            # loss
            loss = loss_function(output, target_output)
            
            #gradient and update parameters
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            
        # print loss per epoch
        print(f'Epoch {epoch + 1}/{epochs}: Loss: {total_loss / len(dataloader)}')

## Train

In [26]:
BATCH_SIZE = 64
dataset = EngSpaDataset(eng_sentences, spa_sentences, eng_word2idx, spa_word2idx)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)


In [27]:
model = Transformer(d_model=512,
                    n_head=8, 
                    dim_feedforward=2048,
                    num_layers=6,
                    input_vocab_size=eng_vocab_size,
                    target_vocab_size=spa_vocab_size,
                    max_seq_len=MAX_SEQ_LEN,
                    dropout=0.1,
                    )

In [28]:
model = model.to(device)
loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [29]:
train(model, dataloader, loss_function, optimizer, epochs=10)

Epoch 1/10: Loss: 3.5323983691421486
Epoch 2/10: Loss: 2.152894883446522
Epoch 3/10: Loss: 1.6524279881515214
Epoch 4/10: Loss: 1.3204779436350544
Epoch 5/10: Loss: 1.0641821615947218
Epoch 6/10: Loss: 0.8580102765675641
Epoch 7/10: Loss: 0.6916081658819785
Epoch 8/10: Loss: 0.5674362089942669
Epoch 9/10: Loss: 0.47832796518428566
Epoch 10/10: Loss: 0.4168320750581745


## Auxiliary functions

In [30]:
def sentence_to_indices(sentence, word2idx):
    return [word2idx.get(word, word2idx['<unk>']) for word in sentence.split()]

def indices_to_sentence(indices, idx2word):
    return ' '.join([idx2word[idx] for idx in indices if idx in idx2word and idx2word[idx] != '<pad>'])

def translate_sentence(model, sentence, eng_word2idx, spa_idx2word, max_len=MAX_SEQ_LEN, device='cpu'):
    model.eval()
    sentence = preprocess_sentence(sentence)
    input_indices = sentence_to_indices(sentence, eng_word2idx)
    input_tensor = torch.tensor(input_indices).unsqueeze(0).to(device)

    # Initialize the target tensor with <sos> token
    tgt_indices = [spa_word2idx['<sos>']]
    tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(device)

    with torch.no_grad():
        for _ in range(max_len):
            output = model(input_tensor, tgt_tensor)
            output = output.squeeze(0)
            next_token = output.argmax(dim=-1)[-1].item()
            tgt_indices.append(next_token)
            tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0).to(device)
            if next_token == spa_word2idx['<eos>']:
                break

    return indices_to_sentence(tgt_indices, spa_idx2word)

In [31]:
def evaluate_translations(model, sentences, eng_word2idx, spa_idx2word, max_len=MAX_SEQ_LEN, device='cpu'):
    for sentence in sentences:
        translation = translate_sentence(model, sentence, eng_word2idx, spa_idx2word, max_len, device)
        print(f'Input sentence: {sentence}')
        print(f'Traducción: {translation}')
        print()


In [33]:
# Example sentences to test the translator
test_sentences = [
    "Hello, how are you?",
    "I am learning artificial intelligence.",
    "Artificial intelligence is great.",
    "Good night!", 
    "The quick brown fox jumps over the lazy dog.",
    "Artificial intelligence is transforming industries rapidly.",
    "She sells seashells by the seashore.",
    "Despite the rain, the soccer match continued uninterrupted.",
    "Quantum computing will revolutionize technology in the coming decades.",
    "The chef cooked an amazing dinner for the guests.",
    "Travel broadens the mind and enriches the soul.",
    "Never underestimate the power of a simple act of kindness.",
    "The new movie received excellent reviews from critics.",
    "He couldn't remember where he had left his keys."   
    
]

# Assuming the model is trained and loaded
# Set the device to 'cpu' or 'cuda' as needed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Evaluate translations
evaluate_translations(model, test_sentences, eng_word2idx, spa_idx2word, max_len=MAX_SEQ_LEN, device=device)

Input sentence: Hello, how are you?
Traducción: <sos> hola como estas <eos>

Input sentence: I am learning artificial intelligence.
Traducción: <sos> estoy aprendiendo inteligencia artificial <eos>

Input sentence: Artificial intelligence is great.
Traducción: <sos> la inteligencia artificial es genial <eos>

Input sentence: Good night!
Traducción: <sos> buenas noches <eos>

Input sentence: The quick brown fox jumps over the lazy dog.
Traducción: <sos> el rapido zorro marron salta por encima del perro vago <eos>

Input sentence: Artificial intelligence is transforming industries rapidly.
Traducción: <sos> la inteligencia artificial es una inteligencia artificial de las industrias rapidamente <eos>

Input sentence: She sells seashells by the seashore.
Traducción: <sos> ella vende conchas en la costa <eos>

Input sentence: Despite the rain, the soccer match continued uninterrupted.
Traducción: <sos> pese a pesar de la lluvia el partido de futbol <eos>

Input sentence: Quantum computing w