# Transformer from scratch

In this notebook, we will attempt to oversimplify things a bit and introduce the modules one by one.

### Pytorch requirement

In [49]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
import matplotlib.pyplot as plt

### Scaled Dot Product Attention

In [50]:
class ScaledDotProductAttention(nn.Module):
    '''Scaled Dot-Product Attention'''

    def __init__(self, temperature, dropout = 0.1):
        super().__init__()
        self.temperature = temperature
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask = None):

        attn = torch.matmul(q / self.temperature, k.transpose(2, 3))

        if mask is not None:
            attn = attn.masked_fill(mask == 0, -1e9)

        attn = self.dropout(F.softmax(attn, dim = -1))
        output = torch.matmul(attn, v)
        
        return output, attn

### Multi head attention

In [51]:
class MultiHeadAttention(nn.Module):
    ''' Multi-Head Attention module'''

    def __init__(self, n_head, d_in, d_out, 
                 dropout = 0.1, qkv_bias = False):
        super().__init__()
        assert d_out % n_head == 0

        self.d_out = d_out
        self.n_head = n_head
        self.d_in = d_in
        self.d_head = d_out // n_head

        self.w_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.w_key = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.w_value = nn.Linear(d_in, d_out, bias = qkv_bias)
        self.out = nn.Linear(d_out, d_out, bias = qkv_bias)

        self.attention = ScaledDotProductAttention(temperature=self.d_head ** 0.5)

        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(d_out, eps = 1e-6)

    def forward(self, q, k, v, mask = None):
        sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)
        
        residual = q

        query = self.w_query(q).view(sz_b, len_q, self.n_head, self.d_head)
        key = self.w_key(k).view(sz_b, len_k, self.n_head, self.d_head)
        value = self.w_value(v).view(sz_b, len_v, self.n_head, self.d_head)

        query, key, value = query.transpose(1, 2), key.transpose(1, 2), value.transpose(1, 2)

        if mask is not None:
            mask = mask.unsqueeze(1) # for head axis broadcasting

        q, attn = self.attention(query, key, value, mask = mask)

        x = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1)
        x = self.dropout(self.out(x))
        x += residual

        x = self.layer_norm(x)

        return x, attn

In [52]:
""" Test MultiHeadAttention """

mha = MultiHeadAttention(4, 8, 8)

q = torch.rand(2, 3, 8)

o, attn = mha(q, q, q)
print(o.size())
print(attn.size())

torch.Size([2, 3, 8])
torch.Size([2, 4, 3, 3])


### Position Wise FeedForward

In [53]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_in, d_hid, dropout=0.1):
        super().__init__()
        self.w_up = nn.Linear(d_in, d_hid)
        self.w_down = nn.Linear(d_hid, d_in)
        self.layer_norm = nn.LayerNorm(d_in, eps=1e-6)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        residual = x

        x = self.w_down(F.relu(self.w_up(x)))
        x = self.dropout(x)
        x += residual

        x = self.layer_norm(x)

        return x

In [54]:
""" Test PositionwiseFeedForward """

ffn = PositionwiseFeedForward(8, 4 * 8)

out = ffn(o)
print(out.size())

torch.Size([2, 3, 8])


### Encoder and Decoder Stacks


#### Positional Encoder

In [55]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_out, n_position=200):
        super().__init__()

        self.register_buffer('pos_table', self._get_encoding_table(n_position, d_out))

    def _get_encoding_table(self, n_position, d_out):

        def get_position_angle_vec(position):
            return [position / np.power(10000, 2 * (hid_j // 2) / d_out) for hid_j in range(d_out)]
        
        sinusoid_table = np.array([get_position_angle_vec(pos_i) for pos_i in range(n_position)])
        sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i
        sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1

        return torch.FloatTensor(sinusoid_table).unsqueeze(0)
    
    def forward(self, x):
        return x + self.pos_table[:, :x.size()].clone().detach()


#### Encoder 

In [56]:
class EncoderLayer(nn.Module):
    '''Compose with two layers'''

    def __init__(self, n_head, d_in, d_out, 
                 dropout = 0.1, qkv_bias = False):
        super(EncoderLayer, self).__init__()

        self.slf_attn = MultiHeadAttention(n_head, d_in, d_out, dropout, qkv_bias)
        self.pos_ffn = PositionwiseFeedForward(d_in, d_in * 4, dropout)

    def forward(self, enc_input, slf_attn_mask = None):
        enc_output, enc_slf_attn = self.slf_attn(
            enc_input, enc_input, enc_input, mask = slf_attn_mask
        )
        enc_output = self.pos_ffn(enc_output)
        return enc_output, enc_slf_attn

In [57]:
class Encoder(nn.Module):
    '''A encoder model with self attention mechanism'''

    def __init__(
            self, src_vocab_size, word_dim, n_layers, n_head, pad_idx, 
            dropout = 0.1, qkv_bias = False, n_position = 200,
    ):
        super().__init__()
        self.src_wor_emb = nn.Embedding(src_vocab_size, word_dim, padding_idx=pad_idx)
        self.position_enc = PositionalEncoding(word_dim, n_position=n_position)
        self.dropout = nn.Dropout(dropout)
        self.layer_stack = nn.ModuleList([
            EncoderLayer(n_head, word_dim, word_dim, dropout, qkv_bias)
            for _ in range(n_layers)])
        self.layer_norm = nn.LayerNorm(word_dim, eps=1e-6)
        
    def forward(self, src_seq, src_mask, return_attns = False):

        enc_slf_attn_list = []

        enc_output = self.src_word_emb(src_seq)
        enc_output = self.dropout(self.position_enc(enc_output))
        enc_output = self.layer_norm(enc_output)

        for enc_layer in self.layer_stack:
            enc_output, enc_slf_attn = enc_layer(enc_output, slf_attn_mask=src_mask)
            enc_slf_attn_list += [enc_slf_attn] if return_attns else []

        if return_attns:
            return enc_output, enc_slf_attn_list
        return enc_output

#### Decoder

In [58]:
class DecoderLayer(nn.Module):
    '''Compose with three layers'''

    def __init__(self, n_head, d_in, d_out,
                 dropout = 0.1, qkv_bias = False):
        super().__init__()
        self.slf_attn = MultiHeadAttention(n_head, d_in, d_out, dropout)
        self.enc_attn = MultiHeadAttention(n_head, d_in, d_out, dropout)
        self.pos_ffn = PositionwiseFeedForward(d_in, d_in * 4, dropout)

    def forward(self, dec_input, enc_output, slf_attn_mask=None, 
                dec_enc_attn_mask = None):
        dec_output, dec_slf_attn = self.slf_attn(
            dec_input, dec_input, dec_input, mask = slf_attn_mask)
        dec_output, dec_enc_attn = self.enc_attn(
            dec_output, enc_output, enc_output, mask = dec_enc_attn_mask
        )
        dec_output = self.pos_ffn(dec_output)
        return dec_output, dec_slf_attn, dec_enc_attn

In [59]:
class Decoder(nn.Module):

    def __init__(
            self, trg_vocab_size, word_dim, n_layers, n_head, pad_idx,
            dropout = 0.1, qkv_bias = False, n_position = 200,
    ):
        super().__init__()
        self.trg_word_emb = nn.Embedding(trg_vocab_size, word_dim, padding_idx=pad_idx)
        self.position_enc = PositionalEncoding(word_dim, n_position=n_position)
        self.dropout = nn.Dropout(dropout)
        self.layer_stack = nn.ModuleList([
            DecoderLayer(n_head, word_dim, word_dim, dropout, qkv_bias)
            for _ in range(n_layers)])
        self.layer_norm = nn.LayerNorm(word_dim, eps=1e-6)
        
    def forward(self, trg_seq, trg_mask, enc_output, src_mask, return_attns = False):
        
        dec_slf_attn_list, dec_enc_attn_list = [], []

        dec_output = self.trg_word_emb(trg_seq)
        dec_output = self.dropout(self.position_enc(dec_output))
        dec_output = self.layer_norm(dec_output)

        for dec_layer in self.layer_stack:
            dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
                dec_output, enc_output, slf_attn_mask=trg_mask, dec_enc_attn_mask=src_mask)
            dec_slf_attn_list += [dec_slf_attn] if return_attns else []
            dec_enc_attn_list += [dec_enc_attn] if return_attns else []

        if return_attns:
            return dec_output, dec_slf_attn_list, dec_enc_attn_list
        return dec_output,

### Transformer

In [60]:
def get_pad_mask(seq, pad_idx):
    return (seq != pad_idx).unsqueeze(-2)


def get_subsequent_mask(seq):
    ''' For masking out the subsequent info. '''
    sz_b, len_s = seq.size()
    subsequent_mask = (1 - torch.triu(
        torch.ones((1, len_s, len_s), device=seq.device), diagonal=1)).bool()
    return subsequent_mask

In [61]:
class Transformer(nn.Module):
    ''' A sequence to sequence model with attention mechanism. '''

    def __init__(
            self, n_src_vocab, n_trg_vocab, src_pad_idx, trg_pad_idx,
            d_word_vec=512, n_layers=6, n_head=8, dropout=0.1, n_position=200,
            trg_emb_prj_weight_sharing=True, emb_src_trg_weight_sharing=True,
            scale_emb_or_prj='prj'):

        super().__init__()

        self.src_pad_idx, self.trg_pad_idx = src_pad_idx, trg_pad_idx

        # In section 3.4 of paper "Attention Is All You Need", there is such detail:
        # "In our model, we share the same weight matrix between the two
        # embedding layers and the pre-softmax linear transformation...
        # In the embedding layers, we multiply those weights by \sqrt{d_model}".
        #
        # Options here:
        #   'emb': multiply \sqrt{d_model} to embedding output
        #   'prj': multiply (\sqrt{d_model} ^ -1) to linear projection output
        #   'none': no multiplication

        assert scale_emb_or_prj in ['emb', 'prj', 'none']
        scale_emb = (scale_emb_or_prj == 'emb') if trg_emb_prj_weight_sharing else False
        self.scale_prj = (scale_emb_or_prj == 'prj') if trg_emb_prj_weight_sharing else False

        self.encoder = Encoder(n_src_vocab, d_word_vec, n_layers, n_head, src_pad_idx, 
            dropout = 0.1, qkv_bias = False, n_position = 200,)

        self.decoder = Decoder(n_trg_vocab, d_word_vec, n_layers, n_head, trg_pad_idx, 
            dropout = 0.1, qkv_bias = False, n_position = 200,)

        self.trg_word_prj = nn.Linear(d_word_vec, n_trg_vocab, bias=False)

        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p) 

    def forward(self, src_seq, trg_seq):

        src_mask = get_pad_mask(src_seq, self.src_pad_idx)
        trg_mask = get_pad_mask(trg_seq, self.trg_pad_idx) & get_subsequent_mask(trg_seq)

        enc_output, *_ = self.encoder(src_seq, src_mask)
        dec_output, *_ = self.decoder(trg_seq, trg_mask, enc_output, src_mask)
        seq_logit = self.trg_word_prj(dec_output)
        if self.scale_prj:
            seq_logit *= self.d_model ** -0.5

        return seq_logit.view(-1, seq_logit.size(2))

### Reference

* [The Illustrated Transformer](https://jalammar.github.io/illustrated-transformer/)
* [LLM Visualization](https://bbycroft.net/llm)