In [1]:
# https://github.com/hyunwoongko/transformer

In [2]:
# https://github.com/nawnoes/pytorch-transformer/blob/main/model/transformer.py
# 간단한 transformer만 구현해보도록 하겠다.
# 2-th block 코드 읽고 해석용 (pytorch 기반)

In [3]:
# kyujin Han
# 2022-02-13
# Title: Transformers 구현하기 (BERT 및 VIT 공부를 위한 Self-Study)

In [4]:
# 주의할점 <아닐 수도 있지만, 경험에 의해서>
# 코드를 읽을 때 mask를 하나의 다른 vector(행렬)이라고 생각해야한다.
# 즉, masking하기 위한 위치를 표현하는 행렬(벡터)라는 것이다.
# 구현방식에 따라 차이가 있겠지만, 해당 코드는 그러한 것 같다.

# 'Attention Is All You Need' 논문 읽고 난 후 코드 읽기를 추천

In [5]:
""" 1. Import Module """
import math
import torch
import torch.nn as nn
from torch.autograd import Variable # 옛날 pytorch에서는 필요했지만, 최신버전이라면 필요는 없다. 그러나 github 코드와 똑같이 구현해보겠다.
import torch.nn.functional as F
from torch.nn import CrossEntropyLoss
from model.util import clones
from transformers.activations import get_activation

In [6]:
# Transpose 설명 (모르는 사람들을 위한 간단한 실습)
a = torch.randn(2,3,4)
print(torch.transpose(a,2,1)) # (2,4,3)
print(a.transpose(0,1)) # (3,2,4)

tensor([[[-1.9775, -0.0458,  0.8604],
         [-0.8800, -0.4634,  2.5793],
         [-2.5340,  0.8712,  2.1390],
         [-0.8467,  0.4708, -1.1788]],

        [[-0.4873, -0.3576,  2.0115],
         [ 0.1328,  1.0732, -0.1254],
         [-0.5781,  0.3483,  1.1489],
         [ 2.3782,  0.8393, -0.2120]]])
tensor([[[-1.9775, -0.8800, -2.5340, -0.8467],
         [-0.4873,  0.1328, -0.5781,  2.3782]],

        [[-0.0458, -0.4634,  0.8712,  0.4708],
         [-0.3576,  1.0732,  0.3483,  0.8393]],

        [[ 0.8604,  2.5793,  2.1390, -1.1788],
         [ 2.0115, -0.1254,  1.1489, -0.2120]]])


self-Attention의 경우 Query Q, Key K, Value V를 입력으로 받아   
MatMul(Q,K) -> Scale -> Masking(opt. Decoder) -> Softmax -> MatMul(result, V)

In [8]:
''' 2-1. Self-Attention 구현 '''
def self_attention(query, key, value, mask=None):
    # input 4 dimensional vector
    # [batch_size, head_num, length, d_tensor]
    
    key_transpose = torch.transpose(key,-2,-1) # (batch_Size, head_num, d_k, token_) 
    matmul_result = torch.matmul(query, key_transpose)
    d_k = query.size()[-1]
    attention_score = matmul_result/math.sqrt(d_k) # Scale
    
    # Masked Multi head Attention에 필요한 조건
    if mask is not None:
        attention_score = attention_score.masked_fill(mask==0, -1e20) # mask가 0인 곳은 {음의 무한대}로
    
    softmax_attention_score = F.softmax(attention_score, dim=-1) # score 행렬
    result = torch.matmul(softmax_attention_score, value)
    
    return result, sofmax_attention_score

멀티헤드 어텐션  
MultiHead(Q,K,V) = Concat(head_1,head_2,...head_n)W^O  
head_i = Attention(QW^Q_i, KW^K_i, VW^V_i)  
W^Q는 모델의 dimension x d_k  
W^K는 모델의 dimension x d_k  
W^V는 모델의 dimension x d_v  
W^O는 d_v * head 갯수 x 모델 dimension  
논문에서는 헤더의 갯수를 8개 사용  

In [9]:
''' 2-2. Multi-Head-Attention 구현 '''
class MultiHeadAttention(nn.Module):
    def __init__(self, head_num=8, d_model=512, dropout=0.1): # 모두 논문의 수치
        super(MultiHeadAttention, self).__init__()
        
        self.head_num = head_num
        self.d_model = d_model
        self.d_k = self.d_v = d_model // head_num # 64
        
        # 논문을 바탕으로 코드 수정함
        self.w_q = nn.Linear(d_model, d_k)
        self.w_k = nn.Linear(d_model, d_k)
        self.w_v = nn.Linear(d_model, d_v)
        self.w_o = nn.Linear(self.head_num*d_model, d_model)
        
        self.self_attention = self_attention
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # same mask applied to all h heads.
            mask = mask.unsqueeze(1) # 차원이 1인 dimension 추가 (mask 차원에)
            
        batche_num = query.size(0)
        
        query = self.w_q(query).view(batche_num, -1, self.head_num, self.d_k).transpose(1,2) # (batch, head_num, token_, d_k)
        key = self.w_k(key).view(batche_num, -1, self.head_num, self.d_k).transpose(1,2)
        value = self.w_v(value).view(batche_num, -1, self.head_num, self.d_k).transpose(1,2)
        
        attention_result, attention_score = self.self_attention(query, key, value, mask)
        
        # 원래의 모양으로 다시 변형해준다.
        # torch.continuos는 다음행과 열로 이동하기 위한 stride가 변형되어
        # 메모리 연속적으로 바꿔야 한다!
        # 참고 문서: https://discuss.pytorch.org/t/contigious-vs-non-contigious-tensor/30107/2
        attention_result = attention_result.transpose(1,2).contiguous().view(batche_num, -1, self.head_num*self.d_k)
        
        return self.w_o(attention_result)

Position-wise Feed-Forward Networks  
FFN(x) = max(0,xW_1 + b_1)W_2+b2  
입력과 출력은 모두 d_model의 dimension을 가지고  
내부의 레이어는 d_model * 4의 dimension을 가진다.  

In [10]:
''' 3. Feed Forward Network '''
class FeedForward(nn.Module):
    def __init__(self,d_model, dropout = 0.1):
        super(FeedForward,self).__init__()
        self.w_1 = nn.Linear(d_model, d_model*4) # (512, 2048)
        self.w_2 = nn.Linear(d_model*4, d_model)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x)))) # Use relu activation

Layer Normalization  
: layer의 hidden unit들에 대해서 mean과 variance를 구한다.   
nn.Parameter는 모듈 파라미터로 여겨지는 텐서  

In [11]:
''' 4. Layer Normalization and Residual connection '''
class LayerNorm(nn.Module):
    def __init__(self, features, eps=1e-6):
        super(LayerNorm,self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features)) # tensor 자료형 유지를 위해 있는 것으로 생각. (왜 필요한지 모르겠음)
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps
        
    def forward(self, x):
        mean = x.mean(-1, keepdim =True) # 평균
        std = x.std(-1, keepdim=True)    # 표준편차

        return self.a_2 * (x-mean)/ (std + self.eps) + self.b_2 # Normalization

class ResidualConnection(nn.Module):
    def __init__(self, size, dropout):
        super(RedisualConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, sublayer):
        return x + self.dropout((sublayer(self.norm(x))))

In [12]:
''' 5. Encoder '''
# MultiHeadAttention + FeedForward
class Encoder(nn.Module):
    def __init__(self, d_model, head_num, dropout):
        super(Encoder, self).__init__()
        self.multi_head_attention = MultiHeadAttention(d_model=d_model, head_num=head_num)
        self.residual_1 = ResidualConnection(d_model, dropout=dropout)
        
        self.feed_forward = FeedForward(d_model)
        self.residual_2 = ResidualConnection(d_model, dropout=dropout)
        
    def forward(self, input, mask):
        x = self.residual_1(input, lambda x: self.multi_head_attention(x,x,x,mask)) # 논문에서 query, key, value의 weight을 공유.
        x = self.residual_2(x, lambda x: self.feed_forward(x))
        return x

Decoder 블록은 FeedForward 레이어와 MultiHead 어텐션, Masked Multihead 어텐션 레이어를 가진다.  
MaskedMultiHeadAttention -> MultiHeadAttention(encoder-decoder attention) -> FeedForward  

In [13]:
''' 6. Decoder '''
class Decoder(nn.Module):
    def __init__(self, d_model, head_num, dropout):
        super(Decoder, self).__init__()
        self.masked_multi_head_attention = MultiHeadAttention(d_model = d_model, head_num = head_num)
        self.residual_1 = ResidualConnection(d_model, dropout = dropout)
        
        self.encoder_decoder_attention = MultiHeadAttention(d_model = d_model, head_num = head_num)
        self.residual_2 = ResidualConnection(d_model, dropout = dropout)
        
        self.feed_forward = FeedForward(d_model)
        self.residual_3 = ResidualConnection(d_model, dropout = dropout)
        
    def forward(self, target, encoder_output, target_mask, encoder_mask):
        """
        Residual_2 부분에 대한 추가 설명
        위와 다르게 이 부분은 Self attention이라고 부르지 않는다.
        Encoder-Decoder-attention 이라고 하는데, Query 값은 이전 SubLayer의 Masked_Multi-Head-Attention의 값에서,
        그리고 key와 Value 값은 Encoder의 Output값에서 만든다.
        """
        x = self.residual_1(target, lambda x: self.masked_multi_head_attention(x,x,x,target_mask))
        x = self.residual_2(x, lambda x: self.encoder_decoder_attention(x, encoder_output, encoder_output, encoder_mask))
        x = self.residual_3(x, self.feed_forward)
        return x
    

In [14]:
''' 7. Embedding Vector '''
class Embeddings(nn.Module):
    def __init__(self, vocab_num, d_model):
        super(Embeddings,self).__init__()
        self.emb = nn.Embedding(vocab_num,d_model)
        self.d_model = d_model
    def forward(self, x):
        """
        1) 임베딩 값에 math.sqrt(self.d_model)을 곱해주는 이유는 무엇인지 찾아볼것 ==> 논문 3.4 근거
        2) nn.Embedding에 다시 한번 찾아볼것
        """
        return self.emb(x) * math.sqrt(self.d_model)

Positional Encoding  
트랜스포머는 RNN이나 CNN을 사용하지 않기 때문에 입력에 순서 값을 반영해줘야 한다.  
예) 나는 어제의 오늘  
PE (pos,2i) = sin(pos/10000^(2i/d_model))  
PE (pos,2i+1) = cos(pos/10000^(2i/d_model))   

In [15]:
''' 8. Positional Encoding '''
class PositionalEncoding(nn.Module):
    def __init__(self, max_seq_len, d_model, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)
        
        pe = torch.zeros(max_seq_len, d_model)
        
        # 논문의 sins and cosine functions 참고
        position = torch.arange(0, max_seq_len).unsqeeze(1) # 1D => 2D
        base = torch.ones(d_model//2).fill_(10000)
        pow_term = torch.arange(0, d_model, 2)/torch.tensor(d_model, dtype=torch.float32)
        div_term = torch.pow(base, pow_term)
        
        pe[:, 0::2] = torch.sin(position/div_term)
        pe[:, 1::2] = torch.cos(position/div_term)
        
        # pe를 학습되지 않는 변수로 등록
        self.register_buffer('position_encoding',pe) # self.positional_encoding 밑의 변수
        
    def forward(self,x):
        x = x+Variable(self.positional_encoding[:,:x.size(1)], requires_grad=False)
        return self.dropout(x)
    
class PositionalEmbedding(nn.Module):
    def __init__(self, dim, max_seq_len):
        super(PositionalEmbedding, self).__init__()
        self.embedding = nn.Embedding(max_seq_len, dim)

    def forward(self, x):
        t = torch.arange(x.shape[1], device=x.device) # 순서 저장 (embedding vector 길이)
        return self.embedding(t)

In [16]:
''' 9. Linear 부분 (Decoder 이후)'''
class Generator(nn.Module):
    def __init__(self, d_model, vocab_num):
        super(Generator, self).__init__()
        self.proj_1 = nn.Linear(d_model, d_model*4)
        self.proj_2 = nn.Linear(d_model*4, vocab_num)

    def forward(self, x):
        x = self.proj_1(x)
        x = self.proj_2(x)
        return x

In [17]:
# http://incredible.ai/nlp/2020/02/29/Transformer/
# Part 2.4 

# Masking에 대한 자세한 설명과, 코드에서 unsqueeze()과 어떻게 masking 역할을 할 수 있는지에 대한 설명.

## 추가자료
## +) # https://paul-hyun.github.io/transformer-01/ 

In [18]:
''' 10. Transformer 구현 '''
class Transformer(nn.Module):
    def __init__(self, vocab_num, d_model, max_seq_len, head_num, dropout, N):
        super(Transformer, self).__init__()
        self.embedding = Embeddings(vocab_num, d_model) # input Embeddings
        self.positional_encoding = PositionalEncoding(max_seq_len, d_model) # positional embedding

        self.encoders = clones(Encoder(d_model=d_model, head_num=head_num, dropout=dropout),N) # 논문 기준, N=6
        self.decoders = clones(Decoder(d_model=d_model, head_num=head_num, dropout=dropout),N) # N=6

        self.generator = Generator(d_model, vocab_num)
    
    # BPE(byte-pair-encoding)을 통해 tokenize를 진행
    # 
    def forward(self, input, target, input_mask, target_mask, labels=None):
        """
        1) input값을 embedding vector로 만들어 준 다음에 positional encoding을 한다.
        2) 그리고 encoder에 넣는다.
        """
        x = self.positional_encoding(self.embedding(input))
        for encoder in self.encoders:
            x = encoder(x, input_mask) # input_mask 같은 경우에는 이제 pad의 위치를 담고 있는 것이 아닐까 생각해본다.
        
        """
        3) 타깃문장을 넣어서 embedding vector를 생성한 후 positional encoding을 한다.
        4) decoder를 실행한다.
        """
        target = self.positional_encoding(self.embedding(target))
        for decoder in self.decoders:
            target = decoder(target, x, target_mask, input_mask) # target_mask는 mask를 할려는 위치를 check한 것이 아닐까 생각해본다.
        
        '''
        5) decoder에서 나온 값을 linear 결합한다.
        # 원래 decoder 마지막에는 linear에서 나온 값을 바탕으로 softmax가 있어야 한다.
        Ex code) 
            prediction = lm_logits.view(-1, lm_logits.size(-1))
            result = F.softmax(prediction, dim=1)
            pred_label = torch.argmax(result, dim=1)
        (스스로 생각해서 작성한거라 틀릴 수도 있음. 그러나 이런 형태로 만들어지는건 틀림없음.)
        '''
        lm_logits = self.generator(target)
        loss = None
        if labels is not None:
            # Shift so that tokens<n predict n
            shift_logits = lm_logits[...,:-1,:].contiguous()
            shift_labels = labels[...,1:].contiguous()
            # Flatten the tokens
            loss_fct = CrossEntroptLoss(ignore_index=0)
            loss = loss_fct(shift_logits.view(-1,shift_logits.size(-1)), shift_labels.view(-1))
            
        return lm_logits, loss
    
    def encode(self, input, input_mask):
        x = self.positional_encoding(self.embedding(input))
        for encoder in self.encoders:
            x = encoder(x, input_mask)
            
        return x
    
    def decode(self, encode_output, encoder_mask, target, target_mask):
        x = self.positional_encoding(self.embedding(input))
        for encoder in self.encoders:
            x = encoder(x, input_mask)
        
        target = self.positional_encoding(self.embedding(target))
        for decoder in self.decoders:
            target = decoder(target, x, target_mask, input_mask)
        
        lm_logits = self.generator(target)
        
        return lm_logits