# Transformer 구현 실습

- Transformer 모델을 구현해보는 실습입니다.
- 논문 : https://arxiv.org/abs/1706.03762
- 정답 + 참고 + 원본 : https://github.com/hyunwoongko/transformer
![model](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/model.png)

In [1]:
import torch
import torch.nn as nn
import torch.functional as F
import math

# 1. Positional Encoding
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/positional_encoding.jpg)

In [2]:
class PositionalEncoding(nn.Module):
    """
    compute sinusoid encoding.
    """
    def __init__(self, d_model, max_len, device):
        """
        constructor of sinusoid encoding class

        :param d_model: dimension of model
        :param max_len: max sequence length
        :param device: hardware device setting
        """
        super(PositionalEncoding, self).__init__()

        # same size with input matrix (for adding with input matrix)

        '''
        TODO를  작성해 주세요.
        '''
        
        self.encoding = torch.zeros(max_len, d_model, device=device)
        self.encoding.requires_grad = '''TODO 1'''   # we don't need to compute gradient

        pos = torch.arange(0, max_len, device=device)
        pos = '''TODO 2'''
        # 1D => 2D unsqueeze to represent word's position

        _2i = torch.arange(0, d_model, step=2, device=device).float()
        # 'i' means index of d_model (e.g. embedding size = 50, 'i' = [0,50])
        # "step=2" means 'i' multiplied with two (same with 2 * i)

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** ('''TODO 3''')))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** ('''TODO 3''')))
        # compute positional encoding to consider positional information of words

    def forward(self, x):

        batch_size, seq_len = x.size()

        return '''TODO 4'''

In [3]:
def padding(data):
    max_len = len(max(data, key=len))
    pad_id = 0

    for i, seq in enumerate(data):
        if len(seq) < max_len:
            data[i] = seq + [pad_id] * (max_len - len(seq))

    return data, max_len

def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = [
        [62, 13, 47, 39, 78, 33, 56, 13],
        [60, 96, 51, 32, 90],
        [35, 45, 48, 65, 91, 99, 92, 10, 3, 21],
        [66, 88, 98, 47],
        [77, 65, 51, 77, 19, 15, 35, 19, 23]
    ]
    vocab_size = 100
    
    data, max_len = padding(data)
    d_model = 512 
    batch = torch.LongTensor(data)
    positional_encoder = PositionalEncoding(d_model, max_len, device)
    
    encode_data = positional_encoder(batch)
    
    assert encode_data.size() == torch.Size([10,512]), 'Worng implementation, Check your shape'
    print('Correct implementation!')
    
test()

# 2. Scale Dot Product Attention & Multi-Head Attention
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/scale_dot_product_attention.jpg)
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/multi_head_attention.jpg)

In [4]:
class ScaleDotProductAttention(nn.Module):
    """
    compute scale dot product attention

    Query : given sentence that we focused on (decoder)
    Key : every sentence to check relationship with Qeury(encoder)
    Value : every sentence same with Key (encoder)
    """

    def __init__(self):
        super(ScaleDotProductAttention, self).__init__()
        self.softmax = nn.Softmax(dim = 1)

    def forward(self, q, k, v, mask=None, e=1e-12):
        '''
        TODO를 작성해 주세요.
        '''
        # input is 4 dimension tensor
        # [batch_size, head, length, d_tensor]
        batch_size, head, length, d_tensor = '''TODO 5'''

        # 1. dot product Query with Key^T to compute similarity
        
        k_t = k.view(batch_size, head, d_tensor, length)  # transpose
        
        score = '''TODO 6'''  # scaled dot product

        # 2. apply masking (opt)
        if mask is not None:
            score = score.masked_fill(mask == 0, -e)

        # 3. pass them softmax to make [0, 1] range
        score = '''TODO 7'''

        # 4. multiply with Value
        v = '''TODO 8'''

        return v, score
     
    
class MultiHeadAttention(nn.Module):
    '''
    TODO를 작성해 주세요.
    ''' 
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.n_head = n_head
        self.attention = '''TODO 9'''
        self.w_q = '''TODO 10'''
        self.w_k = '''TODO 10'''
        self.w_v = '''TODO 10'''
        self.w_concat = '''TODO 10'''

    def forward(self, q, k, v, mask=None):
        # 1. dot product with weight matrices
        q, k, v = '''TODO 11'''

        # 2. split tensor by number of heads
        q, k, v = self.split(q), self.split(k), self.split(v)

        # 3. do scale dot product to compute similarity
        out, attention = self.attention(q, k, v, mask=mask)
        
        # 4. concat and pass to linear layer
        out = self.concat(out)
        out = self.w_concat(out)

        return out

    def split(self, tensor):
        """
        split tensor by number of head

        :param tensor: [batch_size, length, d_model]
        :return: [batch_size, head, length, d_tensor]
        """
        batch_size, length, d_model = tensor.size()

        d_tensor = d_model // self.n_head
        tensor = tensor.view(batch_size, self.n_head, length, d_tensor)
        # it is similar with group convolution (split by number of heads)

        return tensor

    def concat(self, tensor):
        """
        inverse function of self.split(tensor : torch.Tensor)

        :param tensor: [batch_size, head, length, d_tensor]
        :return: [batch_size, length, d_model]
        """
        batch_size, head, length, d_tensor = tensor.size()
        d_model = head * d_tensor

        tensor = tensor.view(batch_size, length, d_model)
        return tensor

In [5]:
def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = [
        [62, 13, 47, 39, 78, 33, 56, 13],
        [60, 96, 51, 32, 90],
        [35, 45, 48, 65, 91, 99, 92, 10, 3, 21],
        [66, 88, 98, 47],
        [77, 65, 51, 77, 19, 15, 35, 19, 23]
    ]
    vocab_size = 100
    data, max_len = padding(data)
    
    d_model = 512  # model의 hidden size
    num_heads = 8 # head
    
    embedding = nn.Embedding(vocab_size, d_model)
    
    
    batch = torch.LongTensor(data) # (B, L)
    batch_emb = embedding(batch).to(device) # ()
    
    positional_encoder = PositionalEncoding(d_model, max_len, device)
    mha = MultiHeadAttention(d_model,num_heads).to(device)
    
    encode_emb = positional_encoder(batch)
    
    batch_emb = batch_emb + encode_emb
    
    out = mha(batch_emb,batch_emb,batch_emb)
    
    assert out.shape == torch.Size([5,10,512]), 'Worng implementation, Check your shape'
    print('Correct implementation!')

test()

# 3. Layer Norm
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/layer_norm.jpg)

In [6]:
class LayerNorm(nn.Module):
    '''
    TODO를 작성해 주세요.
    '''
    def __init__(self, d_model, eps=1e-12):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        # '-1' means last dimension. 

        out = '''TODO 12'''
        out = '''TODO 13'''
        return out

In [7]:
def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = [
        [[1.,2.,3.],
        [6.,5.,4.]],
        
        [[10.,11.,12.],
        [80.,80.,90.]]
    ]
    vocab_size = 100
    data, max_len = padding(data)
    
    d_model = 3  # model의 hidden size
    
    batch = torch.Tensor(data) # (B, L)
    batch = batch # ()
    
    layer_norm = LayerNorm(d_model)
    
    out = layer_norm(batch)
    out = out.long()
    
    assert out.shape == torch.Size([2,2,3]),'Worng implementation, Check your shape'
    assert torch.all(out == torch.tensor([[[-1, 0, 1],[1, 0,-1]],[[-1,0,1],[0 ,0, 1]]])),'Worng implementation, Check your code'
    print('Correct implementation!')

test()

# 4. Positionwise Feed Forward
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/positionwise_feed_forward.jpg)

In [8]:
class PositionwiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = '''TODO 14'''
        self.linear2 = '''TODO 15'''
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        '''TODO 16'''
        return x

In [9]:
def test():
    data = [
        [62, 13, 47, 39, 78, 33, 56, 13],
        [60, 96, 51, 32, 90],
        [35, 45, 48, 65, 91, 99, 92, 10, 3, 21],
        [66, 88, 98, 47],
        [77, 65, 51, 77, 19, 15, 35, 19, 23]
    ]
    vocab_size = 100
    data, max_len = padding(data)
    
    d_model = 512  # model의 hidden size
    embedding = nn.Embedding(vocab_size, d_model)
    batch = torch.LongTensor(data)
    
    batch_emb = embedding(batch)
    #batch_emb = torch.LongTensor(emb_data) # (B, L)
    
    data, max_len = padding(data)
    hidden = 1024
    
    pfn = PositionwiseFeedForward(d_model,hidden,0)
    out = pfn(batch_emb)
    
    assert out.shape == torch.Size([5, 10, 512]),'Worng implementation, Check your shape'
    print('Correct implementation!')

test()

# 5. Encoder & Decoder Structure
![](https://raw.githubusercontent.com/hyunwoongko/transformer/master/image/enc_dec.jpg)

# 5.1 Encoder

In [10]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = '''TODO 17'''
        self.norm1 = '''TODO 18'''
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.ffn = '''TODO 19'''
        self.norm2 = '''TODO 18'''
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, src_mask):
        # 1. compute self attention
        '''TODO 20'''
        
        # 2. add and norm
        '''TODO 21'''
        
        # 3. positionwise feed forward network
        '''TODO 22'''
      
        # 4. add and norm
        '''TODO 23'''
        return x

In [11]:
class TokenEmbedding(nn.Embedding):
    """
    Token Embedding using torch.nn
    they will dense representation of word using weighted matrix
    """

    def __init__(self, vocab_size, d_model):
        """
        class for token embedding that included positional information
        :param vocab_size: size of vocabulary
        :param d_model: dimensions of model
        """
        super(TokenEmbedding, self).__init__(vocab_size, d_model, padding_idx=1)
        
class TransformerEmbedding(nn.Module):
    """
    token embedding + positional encoding (sinusoid)
    positional encoding can give positional information to network
    """

    def __init__(self, vocab_size, d_model, max_len, drop_prob, device):
        """
        class for word embedding that included positional information
        :param vocab_size: size of vocabulary
        :param d_model: dimensions of model
        """
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = '''TODO 24'''
        self.pos_emb = '''TODO 25'''
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x):
        tok_emb = '''TODO 26'''
        pos_emb = '''TODO 27'''
        return self.drop_out(tok_emb + pos_emb)

In [12]:
class Encoder(nn.Module):

    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()
        self.emb = '''TODO 28'''

        self.layers = nn.ModuleList(['''TODO 29'''])

    def forward(self, x, src_mask):
        x = self.emb(x)

        '''TODO 30'''

        return x

In [13]:
def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = [
        [62, 13, 47, 39, 78, 33, 56, 13],
        [60, 96, 51, 32, 90],
        [35, 45, 48, 65, 91, 99, 92, 10, 3, 21],
        [66, 88, 98, 47],
        [77, 65, 51, 77, 19, 15, 35, 19, 23]
    ]
    data, max_len = padding(data)
    vocab_size = 100
    data, max_len = padding(data)
    n_layers = 6
    
    d_model = 100  # model의 hidden size
    batch = torch.LongTensor(data).to(device)
    
    encoder = Encoder(enc_voc_size=vocab_size, max_len = max_len,ffn_hidden = 20,d_model=d_model,device=device,drop_prob=0,n_head=5,n_layers=n_layers)
    encoder.to(device)
    test = encoder(batch,None)
    
    assert len(encoder.layers) == n_layers, 'Worng implementation, Check your Encoder''s layers'
    print('Correct Implementation!')
test()

# 5.2 Decoder

In [14]:
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = '''TODO 31'''
        self.norm1 = '''TODO 32'''
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.enc_dec_attention = '''TODO 31'''
        self.norm2 = '''TODO 32'''
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = '''TODO 33'''
        self.norm3 = '''TODO 32'''
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, dec, enc, trg_mask, src_mask):    
        # 1. compute self attention
        '''TODO 34'''
        
        # 2. add and norm
        '''TODO 35'''

        if enc is not None:
            # 3. compute encoder - decoder attention
            '''TODO 36'''
            
            # 4. add and norm
            '''TODO 37'''

        # 5. positionwise feed forward network
        '''TODO 38'''
        
        # 6. add and norm
        '''TODO 39'''
        return x

In [15]:
class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        drop_prob=drop_prob,
                                        max_len=max_len,
                                        vocab_size=dec_voc_size,
                                        device=device)

        self.layers = nn.ModuleList(['''TODO 40'''])

        self.linear = nn.Linear(d_model, dec_voc_size)

    def forward(self, trg, src, trg_mask, src_mask):
        trg = self.emb(trg)

        '''TODO 41'''

        # pass to LM head
        output = self.linear(trg)
        return output

In [16]:
def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = [
        [62, 13, 47, 39, 78, 33, 56, 13],
        [60, 96, 51, 32, 90],
        [35, 45, 48, 65, 91, 99, 92, 10, 3, 21],
        [66, 88, 98, 47],
        [77, 65, 51, 77, 19, 15, 35, 19, 23]
    ]
    data, max_len = padding(data)
    vocab_size = 100
    data, max_len = padding(data)
    n_layers = 6
    
    d_model = 100  # model의 hidden size
    batch = torch.LongTensor(data).to(device)
    
    encoder = Encoder(enc_voc_size=vocab_size, max_len = max_len,ffn_hidden = 20,d_model=d_model,device=device,drop_prob=0,n_head=5,n_layers=n_layers)
    encoder.to(device)
    src = encoder(batch,None)
    
    decoder = Decoder(dec_voc_size =vocab_size, max_len=max_len, d_model=d_model, ffn_hidden=20, n_head=5, n_layers=n_layers, drop_prob=0, device=device)
    decoder.to(device)
    output = decoder(batch, src,None,None)
    
    print('Correct Implementation!')
test()