In [3]:
import numpy as np
import math

import torch
import torch.nn as nn

# Transformer 구조 구현
### Reference
<Reference> https://kaya-dev.tistory.com/8 </br>
<Reference> https://kaya-dev.tistory.com/11 </br>

## Class 구현
- PositionalEncoding
- Self-Attention
- Multi-head Attention
- Layer Norm layer
- Feed-Foward layer

In [2]:
class PositionalEncoding(nn.Module):

    def __init__(self, max_len, d_model, device):
        super(PositionalEncoding,self).__init__()

        self.encoding = torch.zeros(max_len, d_model, device =device)
        self.encoding.requires_grad = False #그래디언트 계산할 필요 없음

        pos = torch.arange(0,max_len,device=device)
        pos = pos.float().unsqueeze(dim=1)

        _2i = torch.arange(0, d_model, step=2, device = device).float()

        self.encoding[:,0::2] = torch.sin(pos/(10000**(_2i/d_model)))
        self.encoding[:,1::2] = torch.cos(pos/(10000**(_2i/d_model)))

    def forward(self, x):
        batch_size , seq_len = x.size()

        return self.encoding[:,seq_len,:]

In [4]:
class ScaleDotProductAttention(nn.Module):
    # attention score를 계산하는 class
    # Query : focus할 sentence
    # Key : to check relationship with Query
    # Value : every sentence same with key

    def __init__(self):
        super(ScaleDotProductAttention,self).__init__()

        self.softmax = nn.Softmax()
    
    def forward(self, q, k, v, mask = None, e = 1e-12):
        # [batch_size, head, length, d_tensor]
        batch_size, head, length, d_tensor = k.size()

        # Step 1 : Q x K^T 를 통해, 유사도 계산(dop product)
        k_t = k.view(batch_size, head, d_tensor, length)
        score = (q @ k_t) / math.sqrt(d_tensor) # @ 연산은 np.matmul 과 같은 역할이라고 함

        # Step 2 : applying masking(optional)
        if mask is not None:
            score = score.masked_fill(mask==0, -e) 

        # Step 3 : pass tem softmax to make [0,1] range
        score = self.softmax(score)

        # Step 4 : Multiply with Value
        v = score @ v

        return v, score

In [6]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, n_head):
        super(MultiHeadAttention,self).__init__()
        self.n_head = n_head
        self.attention = ScaleDotProductAttention()
        self.w_q = nn.Linear(d_model,d_model)
        self.w_k = nn.Linear(d_model,d_model)
        self.w_v = nn.Linear(d_model,d_model)
        self.w_concat = nn.Linear(d_model,d_model)

    def split(self,tensor):
        """
        split tensor by number of head

        param tensor = [batch_size, length, d_model]
        out = [batch_size, head, length, d_tensor]

        예시)
        d_model = 512 일 때, head를 8개 쓰고 싶다?
        d_tensor = 512/8 = 64
        """
        batch_size, length, d_model = tensor.size() # [B, L, d_model]

        d_tensor = d_model//self.n_head # 64

        tensor = tensor.view(batch_size,self.n_head,length,d_tensor) # [B, H, L, d_tensor]

        return tensor
    
    def concat(self, tensor):
        """
        inverse function of self.split(tensor = torch.Tensor)

        param tensor = [batch_size, head, length, d_tensor]
        out = [batch_size, length, d_model]
        """

        batch_size, head, length, d_tensor = tensor.size()
        d_model = d_tensor * head

        tensor = tensor.view(batch_size, length, d_model)
        return tensor
    
    def forward(self, q,k,v,mask = None):

        # Step 1 : dot product with weight metrics
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        # Step 2 : split tensor by number of heads
        q, k, v = self.split(q), self.split(k), self.split(v)

        # Step 3 : ScaleDotProductionAttention 으로 Attention vector 및 Attention score 계산
        out , attention = self.attention(q, k, v, mask = mask)

        # Step 4 : concat and pass to linear layer
        out = self.concat(out)
        out = self.w_concat(out)

        return out

In [7]:
class LayerNorm(nn.Module):
    def __init__(self,d_model,eps = 1e-12):
        super(LayerNorm,self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model)) # scaling parameter
        self.beta = nn.Parameter(torch.zeros(d_model)) # 평균값을 조정할 수 있는 parameter
        self.eps = eps # 정규화시에 분모가 0이 되는 경우를 방지해주는 parameter

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        # -1 means last dimension = 마지막 차원에 대해서 평균, 분산을 구해야 함
        # keepdim을 통해, 차원을 유지해야지, 밑에 코드로 각 차원에 대한 평균, 분산으로 정규화 진행할 수 있음

        out = (x-mean)/(std + eps)
        out = self.gamma * out + self.beta

        return out

In [8]:
class PositionwiseFeadFoward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeadFoward,self).__init__()
        self.linear1 = nn.Linear(d_model,hidden)
        self.linear2 = nn.Linear(hidden,d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p = drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)

        return x

## Encoder 및 Decoder 구현

#### Encoder

In [10]:
class EncoderLayer(nn.Module):
    def __init__(self,d_model, ffn_hidden, n_head, drop_prob):
        super(EncoderLayer,self).__init__()

        # Multi-Head Attention
        self.attention = MultiHeadAttention(d_model, n_head)

        # Layer Norm layer(After MHA)
        self.norm1 = LayerNorm(d_model = d_model)
        self.dropout1 = nn.Dropout(p = drop_prob)

        # Feed-Forward
        self.ffn = PositionwiseFeadFoward(d_model, ffn_hidden, drop_prob)

        # Layer Norm layer(After FFN)
        self.norm2 = LayerNorm(d_model = d_model)
        self.dropout2 = nn.Dropout(p = drop_prob)

    def forward(self, x, src_mask):
        _x = x

        # Step 1 : Compute Multi-Head Attention
        x = self.attention(q = x, k = x, v = x, mask = src_mask)

        # Step 2 : Compute Add & Layer Norm
        x = self.norm1(x+_x)
        x = self.dropout1(x)

        # Step 3 : Compute Feed-Forward Network
        _x = x
        x = self.ffn(x)

        # Step 4 : Compute Add & Layer Norm
        x = self.norm2(x+_x)
        x = self.dropout2(x)

        return x

In [11]:
class Encoder(nn.Module):

    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()

        # Embedding
        self.embed = nn.Embedding(num_embeddings= enc_voc_size, embedding_dim= d_model, padding_idx = 1)

        # Positional Encoding
        self.pe = PositionalEncoding(max_len = max_len, d_model = d_model, device = device)

        # Add Multi layer
        self.layers = nn.ModuleList([EncoderLayer(d_model = d_model,
                                                  ffn_hidden = ffn_hidden,
                                                  n_head = n_head,
                                                  drop_prob = drop_prob)
                                                  for _ in range(n_layers)])
        
    def forward(self, x, src_mask):
        # Step 1 : Compute Embedding
        x = self.embed(x)  # sentence => vector

        # Step 2 : Get Positional Encoding
        x_pe = self.pe(x)

        # Step 3 : Embedding + Positional Encoding
        x = x + x_pe

        # Step 4 : Compute Encoder layers
        for layer in self.layers:
            x = layer(x, src_mask)

        return x    

#### Decoder

- 첫번째 MHA는 Encoder와 동일.
- encoder의 결과 값이 Decoder의 두번째 MHA에서 Key, Value로 들어감.
- 나머지 Add & Norm / FFN 동일

In [13]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model = d_model, n_head = n_head)

        self.norm1 = LayerNorm(d_model = d_model)
        self.dropout1 = nn.Dropout(p = drop_prob)

        self.enc_dec_attention = MultiHeadAttention(d_model = d_model, n_head = n_head)

        self.norm2 = LayerNorm(d_model = d_model)
        self.dropout2 = nn.Dropout(p = drop_prob)

        self.ffn = PositionwiseFeadFoward(d_model=d_model, hidden = ffn_hidden, drop_prob = drop_prob)

        self.norm3 = LayerNorm(d_model = d_model)
        self.dropout3 = nn.Dropout(p = drop_prob)

    def forward(self, dec, enc, trg_mask, src_mask): #여기서 trg_mask와 src_mask는 뭐지?
        _x = dec

        # Step 1 : Compute self-attention
        x = self.self_attention(q = dec, k = dec, v = dec, mask = trg_mask)
        
        # Step 2 : Add & Norm
        x = self.norm1(x+_x)
        x = self.dropout1(x)

        # Step 3 : Compute enc_dec_attention
        if enc is not None:
            _x = x

            # 여기서 기존의 self-attention과는 다르게 동작
            # Query : Decoder attention output
            # Key : Encoder output
            # Value : Encoder output

            x = self.enc_dec_attention(q = dec, k = enc, v = enc, mask = src_mask)

            # compute add & norm
            x = self.norm2(x+_x)
            x = self.dropout2(x)

        # Step 4 : FFN
        _x = x

        x = self.ffn(x)

        # Step 5 : Add & Norm
        x = self.norm3(x+_x)
        x = self.dropout3(x)

        return x


In [14]:
class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layer, drop_prob, device):
        super().__init__()

        self.embed = nn.Embedding(num_embeddings= dec_voc_size, embedding_dim=d_model, padding_idx = 1)

        self.pe = PositionalEncoding(max_len = max_len, d_model = d_model, device = device)

        self.layers = nn.ModuleList([DecoderLayer(d_model = d_model,
                                                  ffn_hidden = ffn_hidden,
                                                  n_head = n_head,
                                                  drop_prob = drop_prob)
                                                  for _ in range(n_layer)])
                                                  
        #Linear
        self.linear = nn.Linear(d_model, dec_voc_size)

    def forward(self,trg,src,trg_mask,src_mask):

        # Step 1 : Compute Embedding
        trg = self.embed(trg)

        # Step 2 : Get Positional Encoding
        trg_pe = self.pe(trg)

        # Step 3 : Embedding + PE
        trg = trg + trg_pe

        # Step 4 : Compute Decoder layers
        for layer in self.layers:
            trg = layer(dec = trg, enc = src, trg_mask = trg_mask, src_mask = src_mask)
        
        #pass to LM head
        
        output = self.linear(trg)

        return output

## Transformer 구현(이해 더 필요)

In [15]:
class Transformer(nn.Module):
    
    def __init__(self,src_pad_idx,trg_pad_idx,trg_sos_idx,enc_voc_size,dec_voc_size,d_model,n_head,max_len,
                ffn_hidden,n_layers,drop_prob,device):
        super().__init__()
        #Get <PAD> idx
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.trg_sos_idx = trg_sos_idx
        
        #Encoder
        self.encoder = Encoder(enc_voc_size = enc_voc_size,
                              max_len = max_len,
                              d_model = d_model,
                              ffn_hidden = ffn_hidden,
                              n_head = n_head,
                              n_layers = n_layers,
                              drop_prob = drop_prob,
                              device = device)
        
        #Decoder
        self.decoder = Decoder(dec_voc_size = dec_voc_size,
                              max_len = max_len,
                              d_model = d_model,
                              ffn_hidden = ffn_hidden,
                              n_head = n_head,
                              n_layers = n_layers,
                              drop_prob = drop_prob,
                              device = device)
        self.device = device
    
    def make_pad_mask(self,q,k):
    
    	#Padding부분은 attention연산에서 제외해야하므로 mask를 씌워줘서 계산이 되지 않도록 한다.
        
        len_q,len_k = q.size(1),k.size(1)
        print(len_k)
        #batch_size x 1 x 1 x len_k
        k = k.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(2) # 패딩 토큰 위치는 False, 나머지는 True인 boolean 텐서를 반환. 여기서 ne는 not equal을 의미
        print(k.shape)
        # batch_size x 1 x len_q x len_k
        k = k.repeat(1,1,len_q,1)
        
        # batch_size x 1 x len_q x 1
        q = q.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(3)
        # batch_size x 1 x len_q x len_k
        q = q.repeat(1,1,1,len_k)
        
        mask = k & q  # 패딩 위치 = False, 패딩 위치가 아닌 곳 : True
        
        return mask
    
    def make_no_peak_mask(self,q,k):
    	
        #Decoder 부분에서 t번째 단어를 예측하기 위해 입력으로 t-1번째 단어까지 넣어야 하므로 나머지 부분을 masking처리 한다.
        #만약 t번째 단어를 예측하는데 이미 decoder에 t번째 단어가 들어간다면?? => 답을 이미 알고 있는 상황..
        #따라서 Seq2Seq 모델에서 처럼 t번째 단어를 예측하기 위해서 t-1번째 단어까지만 입력될 필요가 있음
        #(나머지 t,t+1,...,max_len)까지 단어는 t번째 단어를 예측하는데 전혀 필요하지 않음 => Masking!!
        len_q,len_k = q.size(1),k.size(1)
        
        #len_q x len_k (torch.tril = 하삼각행렬)
        mask = torch.tril(torch.ones(len_q,len_k)).type(torch.BoolTensor).to(self.device) 
        
        return mask
    
    def forward(self,src,trg):
    	
        #Get Mask
        src_mask = self.make_pad_mask(src,src)
        src_trg_mask = self.make_pad_mask(trg,src)
        trg_mask = self.make_pad_mask(trg,trg) * self.make_no_peak_mask(trg,trg)
        
        #Compute Encoder
        enc_src = self.encoder(src,src_mask)
        
        #Compute Decoder
        output = self.decoder(trg,enc_src,trg_mask,src_trg_mask)
        
        return output