#Requirements

In [1]:
!pip install torchinfo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchinfo
  Downloading torchinfo-1.7.2-py3-none-any.whl (22 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.7.2


#Imports

In [2]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchinfo import summary
import numpy as np
import math
import warnings
warnings.filterwarnings("ignore")

#IO Embedding

![picture](https://miro.medium.com/v2/resize:fit:720/format:webp/1*2vyKzFlzIHfSmOU_lnQE4A.png)

In [3]:
class IO_Embedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()

        self.embed = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # x: [batch_size, seq_len_SRC/TRG]
        return self.embed(x) # [batch_size, seq_len_SRC/TRG, d_model]

![picture](https://miro.medium.com/v2/resize:fit:524/format:webp/1*yWGV9ck-0ltfV2wscUeo7Q.png)

![picture](https://miro.medium.com/v2/resize:fit:564/format:webp/1*SgNlyFaHH8ljBbpCupDhSQ.png)

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len):
        '''
        Why considering max_seq_len?
          Since seq_len_SRC is not necessarily equal to seq_len_TRG and since
          we want to use this class both for SRC and TRG sentences, we set:
                max_seq_len = MAX(seq_len_SRC, seq_len_TRG - 1).
        '''
        super().__init__()

        self.d_model = d_model
        positional_emb = torch.zeros(max_seq_len, d_model)

        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                positional_emb[pos, i] = math.sin(pos / (10000 ** (i/d_model)))
                positional_emb[pos, i + 1] = math.cos(pos / (10000 ** (i/d_model)))
                
        self.register_buffer('positional_emb', positional_emb)
        self.positional_emb.requires_grad = False
    
    def forward(self, x):
        '''
        x is the embedded vector, coming from the previous class as the output.
        The reason we increase the embedding values before addition is to make the
        positional encoding relatively smaller. This means the original meaning in
        the embedding vector wont be lost when we add them together.
        '''
        # x: [batch_size, seq_len_SRC/TRG, d_model]      
        x = x * math.sqrt(self.d_model)

        _, seq_len, _ = x.size()
        x = x + self.positional_emb[:seq_len, :]
        # self.positional_emb[:seq_len, :]: [seq_len_SRC/TRG, d_model]
        # x:                                [batch_size, seq_len_SRC/TRG, d_model] 

        return x

#Attention

![picture](https://miro.medium.com/v2/resize:fit:750/format:webp/1*1tsRtfaY9z6HxmERYhw8XQ.png)

V, K and Q stand for ‘key’, ‘value’ and ‘query’. These are terms used in attention functions, but honestly, I don’t think explaining this terminology is particularly important for understanding the model.

In the case of the Encoder, V, K and G will simply be identical copies of the embedding vector (plus positional encoding). They will have the dimensions Batch_size * seq_len * d_model.

In multi-head attention we split the embedding vector into N heads, so they will then have the dimensions batch_size * N * seq_len * (d_model / N).

This final dimension (d_model / N ) we will refer to as d_k.



![pictures](https://miro.medium.com/v2/resize:fit:224/format:webp/1*15E9qKg9bKnWdSRWCyY2iA.png)
![pictures](https://miro.medium.com/v2/resize:fit:640/format:webp/1*evdACdTOBT5j1g1nXialBg.png)


In [5]:
def attention(q, k, v, d_k, mask=None, dropout=None):
    #(1) q, k, v : [batch_size, N, seq_len_SRC/TRG, d_k]
    #(2) k, v : [batch_size, N, seq_len_SRC, d_k] ---- q: [batch_size, N, seq_len_TRG, d_k]
    #(1) ---> First Attention Layers in Encoder and Decoder
    #(2) ---> Middle Attention Layer in Decoder

    scores = torch.matmul(q, k.permute(0, 1, 3, 2)) /  math.sqrt(d_k)
    #(1) scores: [batch_size, N, seq_len_SRC/TRG, seq_len_SRC/TRG]
    #(2) scores: [batch_size, N, seq_len_TRG, seq_len_SRC]
    if mask is not None:
        mask = mask.unsqueeze(1)
        scores = scores.masked_fill(mask == 0, -1e9)

    scores = F.softmax(scores, dim=-1)
    
    if dropout is not None:
        scores = dropout(scores)
        
    output = torch.matmul(scores, v)
    #(1) output: [batch_size, N, seq_len_SRC/TRG, d_k]
    #(2) output: [batch_size, N, seq_len_TRG, d_k]
    return output

In [6]:
class MultiHeadAttention(nn.Module):
    def __init__(self, heads, d_model, dropout = 0.1):
        super().__init__()
        
        self.N = heads
        self.d_model = d_model
        self.d_k = d_model // heads
        
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(d_model, d_model)
    
    def forward(self, q, k, v, mask=None):
        #(1) q, k, v : [batch_size, seq_len_SRC/TRG, d_model]
        #(2) k, v : [batch_size, seq_len_SRC, d_model] ---- q: [batch_size, seq_len_TRG, d_model]
        #(1) ---> First Attention Layers in Encoder and Decoder
        #(2) ---> Middle Attention Layer in Decoder

        batch_size = q.size(0)
                
        k = self.k_linear(k).view(batch_size, -1, self.N, self.d_k).permute(0, 2, 1, 3)
        q = self.q_linear(q).view(batch_size, -1, self.N, self.d_k).permute(0, 2, 1, 3)
        v = self.v_linear(v).view(batch_size, -1, self.N, self.d_k).permute(0, 2, 1, 3)
        #(1) q, k, v : [batch_size, N, seq_len_SRC/TRG, d_k]
        #(2) k, v : [batch_size, N, seq_len_SRC, d_k] ---- q: [batch_size, N, seq_len_TRG, d_k]

        scores = attention(q, k, v, self.d_k, mask, self.dropout)
        #(1) scores: [batch_size, N, seq_len_SRC/TRG, d_k]
        #(2) scores: [batch_size, N, seq_len_TRG, d_k]
        
        concat = scores.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.d_model)
        #(1) concat: [batch_size, seq_len_SRC/TRG, d_model]
        #(2) concat: [batch_size, seq_len_TRG, d_model]
        output = self.out(concat)
        #(1) output: [batch_size, seq_len_SRC/TRG, d_model]
        #(2) output: [batch_size, seq_len_TRG, d_model]
    
        return output

#Norm

![picture](https://github.com/hyunwoongko/transformer/raw/master/image/layer_norm.jpg)

In [7]:
class Norm(nn.Module):
    def __init__(self, d_model, eps = 1e-6):
        super().__init__()
    
        self.d_model = d_model
        self.eps = eps

        self.Gamma = nn.Parameter(torch.ones(self.d_model)) #learnable
        self.Beta = nn.Parameter(torch.zeros(self.d_model)) #learnable
        

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        mio = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, unbiased=False, keepdim=True)

        x_hat = (x - mio) / (torch.sqrt(var + self.eps))
        y = self.Gamma * x_hat + self.Beta
        # y: [batch_size, seq_len, d_model]
        return y

#FF

In [8]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout = 0.1):
        super().__init__() 

        self.lin1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.lin2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        x = self.dropout(F.relu(self.lin1(x)))
        # x: [batch_size, seq_len, d_ff]
        x = self.lin2(x)
        # x: [batch_size, seq_len, d_model]
        return x

#Encoder and Decoder Layers

![picture](https://miro.medium.com/v2/resize:fit:720/format:webp/1*2vyKzFlzIHfSmOU_lnQE4A.png)

In [9]:
class SingleEncoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout = 0.1):

        super().__init__()

        self.norm1 = Norm(d_model)
        self.norm2 = Norm(d_model)
        self.attention = MultiHeadAttention(heads, d_model)
        self.ff = FeedForward(d_model)
        self.drp1 = nn.Dropout(dropout)
        self.drp2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        # x: [batch_size, seq_len_SRC, d_model]
        x_copied = x
        x = self.attention(x, x, x, mask) # Attention
        # x: [batch_size, seq_len_SRC, d_model]
        x = self.norm1(x_copied + self.drp1(x)) # Add & Norm
        # x: [batch_size, seq_len_SRC, d_model]
        
        x_copied = x
        x = self.ff(x) # Feed forward
        # x: [batch_size, seq_len_SRC, d_model]
        x = self.norm2(x_copied + self.drp2(x)) # Add & Norm
        # x: [batch_size, seq_len_SRC, d_model]
        return x

In [10]:
class SingleDecoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):

        super().__init__()

        self.norm1 = Norm(d_model)
        self.norm2 = Norm(d_model)
        self.norm3 = Norm(d_model)
        
        self.drp1 = nn.Dropout(dropout)
        self.drp2 = nn.Dropout(dropout)
        self.drp3 = nn.Dropout(dropout)
        
        self.attention1 = MultiHeadAttention(heads, d_model)
        self.attention2 = MultiHeadAttention(heads, d_model)

        self.ff = FeedForward(d_model)

    def forward(self, y, enc, src_mask, trg_mask):
        # y: [batch_size, seq_len_TRG, d_model]
        y_copied = y
        y = self.attention1(y, y, y, trg_mask) # Attention: Bottom
        y = self.norm1(y_copied + self.drp1(y)) # Add & Norm
        # y: [batch_size, seq_len_TRG, d_model]

        # enc: [batch_size, seq_len_SRC, d_model]
        enc = self.attention2(y, enc, enc, src_mask) # Attention: Middle
        # enc: [batch_size, seq_len_TRG, d_model] ---> (2)
        enc = self.norm2(y + self.drp2(enc)) # Add & Norm : Very important
        # enc: [batch_size, seq_len_TRG, d_model] ---> (2)

        enc_copied = enc
        enc = self.ff(enc) # Feed forward: Up
        # enc: [batch_size, seq_len_TRG, d_model]
        out = self.norm3(enc_copied + self.drp3(enc)) # Add & Norm
        # out: [batch_size, seq_len_TRG, d_model]

        return out

#Encoder and Decoder

In [11]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, max_seq_len):

        super().__init__()

        self.N = N # how many encoding layer
        self.emb = IO_Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, max_seq_len)
        self.layers = nn.ModuleList([SingleEncoderLayer(d_model, heads) for _ in range(N)])

    def forward(self, src, mask):
        # x: [batch_size, seq_len_SRC]
        x = self.emb(src)
        # x: [batch_size, seq_len_SRC, d_model]
        x = self.pe(x)
        # x: [batch_size, seq_len_SRC, d_model]

        for i in range(self.N):
            x = self.layers[i](x, mask)
        # x: [batch_size, seq_len_SRC, d_model]
        return x

In [12]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, max_seq_len):

        super().__init__()

        self.N = N
        self.emb = IO_Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, max_seq_len)
        self.layers = nn.ModuleList([SingleDecoderLayer(d_model, heads) for _ in range(N)])

    def forward(self, trg, enc, src_mask, trg_mask):
        # x: [batch_size, seq_len_TRG]
        x = self.emb(trg)
        # x: [batch_size, seq_len_TRG, d_model]
        x = self.pe(x)
        # x: [batch_size, seq_len_TRG, d_model]

        for i in range(self.N):
            x = self.layers[i](x, enc, src_mask, trg_mask)
        # x: [batch_size, seq_len_TRG, d_model]
        return x

#Transformer

In [13]:
class Transformer(nn.Module):
    def __init__(self, src_vocab, trg_vocab, d_model, N, heads, max_seq_len):

        super().__init__()

        self.encoder = Encoder(src_vocab, d_model, N, heads, max_seq_len)
        self.decoder = Decoder(trg_vocab, d_model, N, heads, max_seq_len)

        self.out = nn.Linear(d_model, trg_vocab)

    def forward(self, src, trg, src_mask=None, trg_mask=None):
 
        # src: [batch_size, seq_len_SRC]
        # trg: [batch_size, seq_len_TRG]
        enc = self.encoder(src, src_mask)
        # enc: [batch_size, seq_len_SRC, d_model]
        dec = self.decoder(trg, enc, src_mask, trg_mask)
        # dec: [batch_size, seq_len_TRG, d_model]
        output = self.out(dec)
        # output: [batch_size, seq_len_TRG, trg_vocab]
        return output

#Train

In [14]:
def train(model, optim, dataloader, epochs, print_step=1):
    
    model.train()  
    total_loss = []

    for epoch in range(epochs): 

        losses, step = 0, 0

        for idx, (src, trg) in enumerate(dataloader):
            # src: [batch_size, seq_len_SRC]
            # trg: [batch_size, seq_len_TRG_PRIME]

            trg_input = trg[:, :-1]  
            # trg_input: [batch_size, seq_len_TRG=seq_len_TRG_PRIME-1]
            
            preds = model(src, trg_input, src_mask=None, trg_mask=None)
            # preds: [batch_size, seq_len_TRG, trg_vocab]

            ys = trg[:, 1:].contiguous() #Right shifted
            # ys: [batch_size, seq_len_TRG]

            optim.zero_grad()

            loss = F.cross_entropy(preds.view(-1, preds.size(-1)), ys.view(-1))
            loss.backward()
            optim.step()
            
            losses += loss
            step += 1
        
        total_loss.append(losses.item() / step)

        if epoch % print_step == 0:

          print(f"Epoch: {epoch+1} -> Loss: {total_loss[-1]: .8f}")

    return total_loss

#Data

In [24]:
English_sens = ['i am fine', 'you are fine', 'he is fine', 'they are fine']
Spanish_sens = ['<st> yo estoy bien <end>', '<st> tu eres bien <end>',
                '<st> el es bien <end>', '<st> ellos son bien <end>']

tokenizer_src = get_tokenizer('basic_english', language='en')
tokenizer_trg = get_tokenizer('toktok', language='es')

seq_len_src = 10
seq_len_trg_PRIME = 17
seq_len_trg = seq_len_trg_PRIME - 1

def create_tokens(tokenizer, dataset):
  for sample in dataset:
    yield tokenizer(sample)

vocab_src = build_vocab_from_iterator(create_tokens(tokenizer_src, English_sens), specials=["<oov>", "<sos>"])
vocab_src.set_default_index(vocab_src["<oov>"])
print(f"Our vocabulary is made of {len(vocab_src)} tokens-index pairs.")

vocab_trg = build_vocab_from_iterator(create_tokens(tokenizer_trg, Spanish_sens), specials=["<oov>", "<sos>"])
vocab_trg.set_default_index(vocab_trg["<oov>"])
print(f"Our vocabulary is made of {len(vocab_trg)} tokens-index pairs.")

idx_to_word_src = {vocab_src[w]:w for w in vocab_src.get_itos()}
idx_to_word_trg = {vocab_trg[w]:w for w in vocab_trg.get_itos()}

text_pipeline_src = lambda x: vocab_src(tokenizer_src(x))
text_pipeline_trg = lambda x: vocab_trg(tokenizer_trg(x))

def sent_padding(sent_vec, maxlen):
  sent_vec = torch.tensor(sent_vec)
  maxlen -= len(sent_vec)
  return F.pad(sent_vec, (0, maxlen))

class MyDataset(Dataset):

  def __init__(self, SRC, TRG, seq_len_src, seq_len_trg, device):
    self.SRC = SRC
    self.TRG = TRG
    self.seq_len_src = seq_len_src
    self.seq_len_trg = seq_len_trg
    self.device = device

  def __len__(self):
    return len(self.SRC)
  
  def __getitem__(self, idx):
    src, trg = self.SRC[idx], self.TRG[idx]

    src = sent_padding(text_pipeline_src(src), maxlen=self.seq_len_src)
    trg = sent_padding(text_pipeline_trg(trg), maxlen=self.seq_len_trg)

    return src.to(self.device), trg.to(self.device)

batch_size = 2
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

dataloader = DataLoader(MyDataset(English_sens, Spanish_sens, seq_len_src, seq_len_trg_PRIME, device), batch_size=batch_size)

Our vocabulary is made of 10 tokens-index pairs.
Our vocabulary is made of 13 tokens-index pairs.


# Model Training

In [25]:
src_vocab = len(vocab_src)
trg_vocab = len(vocab_trg)
d_model = 32
N = 1
heads = 2
max_seq_len = max(seq_len_src, seq_len_trg)


model = Transformer(src_vocab, trg_vocab, d_model, N, heads, max_seq_len).to(device)

summary(model, [(batch_size, seq_len_src), (batch_size, seq_len_trg)], dtypes=[torch.long, torch.long])

Layer (type:depth-idx)                        Output Shape              Param #
Transformer                                   [2, 16, 13]               --
├─Encoder: 1-1                                [2, 10, 32]               --
│    └─IO_Embedding: 2-1                      [2, 10, 32]               --
│    │    └─Embedding: 3-1                    [2, 10, 32]               320
│    └─PositionalEncoding: 2-2                [2, 10, 32]               --
│    └─ModuleList: 2-3                        --                        --
│    │    └─SingleEncoderLayer: 3-2           [2, 10, 32]               137,504
├─Decoder: 1-2                                [2, 16, 32]               --
│    └─IO_Embedding: 2-4                      [2, 16, 32]               --
│    │    └─Embedding: 3-3                    [2, 16, 32]               416
│    └─PositionalEncoding: 2-5                [2, 16, 32]               --
│    └─ModuleList: 2-6                        --                        --
│    │    └─S

In [26]:
epochs = 1000
print_step = 50
lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

loss = train(model, optimizer, dataloader, epochs, print_step)

Epoch: 1 -> Loss:  2.33081889
Epoch: 51 -> Loss:  0.07398581
Epoch: 101 -> Loss:  0.02989515
Epoch: 151 -> Loss:  0.01468811
Epoch: 201 -> Loss:  0.00888169
Epoch: 251 -> Loss:  0.00636232
Epoch: 301 -> Loss:  0.00446561
Epoch: 351 -> Loss:  0.00289530
Epoch: 401 -> Loss:  0.00263622
Epoch: 451 -> Loss:  0.00211905
Epoch: 501 -> Loss:  0.00196347
Epoch: 551 -> Loss:  0.00142200
Epoch: 601 -> Loss:  0.00122349
Epoch: 651 -> Loss:  0.00140041
Epoch: 701 -> Loss:  0.00105487
Epoch: 751 -> Loss:  0.00085137
Epoch: 801 -> Loss:  0.00072205
Epoch: 851 -> Loss:  0.00062553
Epoch: 901 -> Loss:  0.00060675
Epoch: 951 -> Loss:  0.00060869


#Translation

In [27]:
@torch.no_grad()
def translate(sentence, device):

  sen_SRC = sent_padding(text_pipeline_src(sentence), maxlen=seq_len_src).unsqueeze(0).to(device)

  sen_TRG = '<st>'

  while '<end>' not in sen_TRG:

    length = len(sen_TRG.split())

    trg_input = sent_padding(text_pipeline_trg(sen_TRG), maxlen=seq_len_trg).unsqueeze(0)[:, :-1].to(device)

    preds = model(sen_SRC, trg_input, src_mask=None, trg_mask=None).squeeze()

    next_word_idx = torch.argmax(preds, dim=-1)[length-1] #IMPORTANT
    
    sen_TRG += (' ' + idx_to_word_trg[next_word_idx.item()])

  return sen_TRG


In [30]:
sentence = 'they are fine'

print(translate(sentence, device))

<st> ellos son bien <end>
