<a href="https://colab.research.google.com/github/heejvely/NLP_models/blob/main/Transformer_%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

* 코드 참조: https://paul-hyun.github.io/transformer-02/

0. Imports

In [1]:
# sentencepiece import
!pip install sentencepiece
!pip install wget

Collecting sentencepiece
  Downloading sentencepiece-0.1.96-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[?25l[K     |▎                               | 10 kB 36.8 MB/s eta 0:00:01[K     |▌                               | 20 kB 21.7 MB/s eta 0:00:01[K     |▉                               | 30 kB 16.8 MB/s eta 0:00:01[K     |█                               | 40 kB 15.3 MB/s eta 0:00:01[K     |█▍                              | 51 kB 8.7 MB/s eta 0:00:01[K     |█▋                              | 61 kB 9.4 MB/s eta 0:00:01[K     |██                              | 71 kB 9.6 MB/s eta 0:00:01[K     |██▏                             | 81 kB 10.8 MB/s eta 0:00:01[K     |██▍                             | 92 kB 10.4 MB/s eta 0:00:01[K     |██▊                             | 102 kB 8.5 MB/s eta 0:00:01[K     |███                             | 112 kB 8.5 MB/s eta 0:00:01[K     |███▎                            | 122 kB 8.5 MB/s eta 0:00:01[K     |███▌    

In [4]:
import os
import numpy as np
import math
import matplotlib.pyplot as plt
import json
import pandas as pd
from IPython.display import display
from tqdm import tqdm, tqdm_notebook, trange
import sentencepiece as spm
import wget

import torch
import torch.nn as nn
import torch.nn.functional as F

1. Vocab

sentencepiece를 활용해 만든 vocab을 이용해 텍스트를 입력 tensor로 변경

In [2]:
import sentencepiece as spm
# vocab loading
vocab_file = '/content/drive/MyDrive/colab/NLP_모델/kowiki_corpus/kowiki.model'
vocab = spm.SentencePieceProcessor()
vocab.load(vocab_file)

True

* SentencePieceProcessor class 참고
: https://github.com/google/sentencepiece/blob/master/doc/api.md

In [None]:
help(spm.SentencePieceProcessor)

Help on class SentencePieceProcessor in module sentencepiece:

class SentencePieceProcessor(builtins.object)
 |  SentencePieceProcessor(model_file=None, model_proto=None, out_type=<class 'int'>, add_bos=False, add_eos=False, reverse=False, enable_sampling=False, nbest_size=-1, alpha=0.1)
 |  
 |  Methods defined here:
 |  
 |  Decode(self, input)
 |      Decode processed id or token sequences.
 |  
 |  DecodeIds = DecodeIdsWithCheck(self, ids)
 |  
 |  DecodeIdsAsSerializedProto = DecodeIdsAsSerializedProtoWithCheck(self, ids)
 |  
 |  DecodeIdsAsSerializedProtoWithCheck(self, ids)
 |  
 |  DecodeIdsWithCheck(self, ids)
 |  
 |  DecodePieces(self, pieces)
 |  
 |  DecodePiecesAsSerializedProto(self, pieces)
 |  
 |  Detokenize = Decode(self, input)
 |  
 |  Encode(self, input, out_type=None, add_bos=None, add_eos=None, reverse=None, enable_sampling=None, nbest_size=None, alpha=None)
 |      Encode text input to segmented ids or tokens.
 |      
 |      Args:
 |      input: input string

2. Config

설정을 json 형태로 저장하고, 이를 읽어서 처리하는 간단한 클래스

In [5]:
"""configuration json을 읽어들이는 class"""
class Config(dict):
  __getattr__ = dict.__getitem__
  __setarrt__ = dict.__setitem__

  @classmethod
  def load(cls, file):
    with open(file, 'r') as f:
      config = json.loads(f.read())
      return Config(config)

In [6]:
config = Config({
    'n_enc_vocab':len(vocab),
    'n_dec_vocab':len(vocab),
    'n_enc_seq':256,
    'n_dec_seq':256,
    'n_layer':6,
    'd_hidn':256,
    'i_pad':0,
    'd_ff':1024,
    'n_head':4,
    'd_head':64,
    'dropout':0.1,
    'layer_norm_epsilon':1e-12
})
print(config)

{'n_enc_vocab': 8007, 'n_dec_vocab': 8007, 'n_enc_seq': 256, 'n_dec_seq': 256, 'n_layer': 6, 'd_hidn': 256, 'i_pad': 0, 'd_ff': 1024, 'n_head': 4, 'd_head': 64, 'dropout': 0.1, 'layer_norm_epsilon': 1e-12}


3. Common Class

Transformer 구현하기_1.ipynb 에서 구현한 'Position Embedding','Multi-Head Attention','Feed Forward'의 코드

#### Position Encoding

Position Embedding의 초기 값을 구하는 함수
1. 각 position별 hidden index별 angle 값을 구합니다.
2. hidden 짝수 index의 angle값의 sin값을 구합니다.
3. hidden 홀수 index의 angle값의 cos값을 구합니다.

In [8]:
"""sinusoid position encoding"""
def get_sinusoid_encoding_table(n_seq, d_hidn):
  def cal_angle(position, i_hidn):
    return position / np.power(10000, 2 * (i_hidn // 2) / d_hidn)
  def get_posi_angle_vec(position):
    return [cal_angle(position, i_hidn) for i_hidn in range(d_hidn)]

  sinusoid_table = np.array([get_posi_angle_vec(i_seq) for i_seq in range(n_seq)])  # 1. 각 position별 hidden index별 angle 값을 구합니다.
  sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])                         # 2. hidden 짝수 index의 angle값의 sin값을 구합니다.
  sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])                         # 3. hidden 홀수 index의 angle값의 cos값을 구합니다.

  return sinusoid_table

#### Attention Pad Mask

Attention을 구할 때 Padding 부분을 제외하기 위한 Mask를 구하는 함수

1. K의 값 중에 Pad인 부분을 True로 변경합니다.(나머지는 False)
2. 구해진 값의 크기를 Q-len, K-len 되도록 변경합니다.

In [9]:
"""attention pad mask"""
def get_attn_pad_mask(seq_q, seq_k, i_pad):
  batch_size, len_q = seq_q.size()
  batch_size, len_k = seq_k.size()
  pad_attn_mask = seq_k.data.eq(i_pad)                                              # 1. K의 값 중에 Pad인 부분을 True로 변경합니다.(나머지는 False)
  pad_attn_mask = pad_attn_mask.unsqueeze(1).expand(batch_size, len_q, len_k)       # 2. 구해진 값의 크기를 Q-len, K-len 되도록 변경합니다.
  return pad_attn_mask

#### Attention Decoder Mask

Decoder의 'Masked Multi Head Attention'에서 사용할 Mask를 구하는 함수입니다.

현재 단어와 이전 단어는 볼 수 있고 다음 단어는 볼 수 없도록 Masking 합니다.

1. 모든 값이 1인 Q-len, K-leb 테이블을 생성합니다.
2. 대각선을 기준으로 아래쪽을 0으로 만듭니다.

In [10]:
"""attention decoder mask"""
def get_attn_decoder_mask(seq):
  subsequent_mask = torch.ones_like(seq).unsqueeze(-1).expand(seq.size(0), seq.size(1), seq.size(1)) # 1. 모든 값이 1인 Q-len, K-leb 테이블을 생성합니다.
  subsequent_mask = subsequent_mask.triu(diafonal=1)                                                 # 2. 대각선을 기준으로 아래쪽을 0으로 만듭니다.
  return subsequent_mask

#### Scaled Dot Product Attention

Scaled Dot Product Attention을 구하는 클래스

1. Q * K.transpose를 구합니다.
2. K-dimention에 루트를 취한 값으로 나눠 줍니다.
3. Mask를 적용합니다.
4. Softmax를 취해 각 단어의 가중치 확률분포 attn_prob를 구합니다.
5. attn_prob * V를 구합니다. 구한 값은 Q에 대한 V의 가중치 합 벡터입니다.

In [11]:
"""scale dot product attention"""
class ScaledDotProductAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config
    self.dopout= nn.Dropout(config.dropout)
    self.scale = 1 / (self.config.d_head ** 0.5)

  def forward(self, Q, K, V, attn_mask):
    scores = torch.matmul(Q, K.transpose(-1, -2))     # 1. Q * K.transpose를 구합니다.
    scores = scores.mul(self.scale)                   # 2. K-dimention에 루트를 취한 값으로 나눠 줍니다.
    scores.masked_fill_(attn_mask, -1e9)              # 3. Mask를 적용합니다.

    attn_prob = nn.Softmax(dim= -1)(scores)           # 4. Softmax를 취해 각 단어의 가중치 확률분포 attn_prob를 구합니다.
    attn_prob = self.dropout(attn_prob)               # 5. attn_prob * V를 구합니다. 구한 값은 Q에 대한 V의 가중치 합 벡터입니다.

    context = torch.matmul(attn_prob, V)

    return context, attn_prob

#### Multi-Head Attention

Multi-Head Attention을 구하는 클래스

1. Q * W_Q를 한 후 multi-head로 나눕니다.
2. K * W_K를 한 후 multi-head로 나눕니다.
3. V * W_V를 한 후 multi-head로 나눕니다.
4. ScaledDotProductAttention 클래스를 이용해 각 head 별 Attention을 구합니다.
5. 여러 개의 head를 1개로 합칩니다.
6. Linear를 취해 최종 Multi-Head Attention값을 구합니다.

In [12]:
"""multi head attention"""
class MultiHeadAttention(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.W_Q = nn.Linear(self.config.d_hidn, self.config.n_head * self.config.d_head)
    self.W_K = nn.Linear(self.config.d_hidn, self.config.n_head * self.config.d_head)
    self.W_V = nn.Linear(self.config.d_hidn, self.config.n_head * self.config.d_head)
    self.scaled_dot_attn = ScaledDotProductAttention(self.config)
    self.linear = nn.Linear(self.config.n_head * self.config.d_head, self.config.d_hidn)
    self.dropout = nn.Dropout(config.dropout)

  def forward(self, Q, K, V, attn_mask):
    batch_size = Q.size(0)
    q_s = self.W_Q(Q).view(batch_size, -1, self.config.n_head, self.config.d_head).transpose(1,2)                   # 1. Q * W_Q를 한 후 multi-head로 나눕니다.
    k_s = self.W_K(K).view(batch_size, -1, self.config.n_head, self.config.d_head).transpose(1,2)                   # 2. K * W_K를 한 후 multi-head로 나눕니다.
    v_s = self.W_V(V).view(batch_size, -1, self.config.n_head, self.config.d_head).transpose(1,2)                   # 3. V * W_V를 한 후 multi-head로 나눕니다.

    attn_mask = attn_mask.unsqueeze(1).repeat(1, self.config.n_head, 1, 1)

    context, attn_prob = self.scaled_dot_attn(q_s, k_s, v_s, attn_mask)                                             # 4. ScaledDotProductAttention 클래스를 이용해 각 head 별 Attention을 구합니다.
    context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.config.n_head * self.config.d_head)    # 5. 여러 개의 head를 1개로 합칩니다.
    output = self.linear(context)                                                                                   # 6. Linear를 취해 최종 Multi-Head Attention값을 구합니다.
    output = self.dropout(output)
    
    return output, attn_prob


#### Feed Forward

FeedForward를 처리하는 클래스입니다.

1. Linear를 실행하여 shape을 d_ff(hidden * 4)크기로 키웁니다.
2. activation 함수(relu or gelu)를 실행합니다.
3. Linear를 실행하여 shape을 hidden 크기로 줄입니다.

In [13]:
"""feed forward"""
class PoswiseFeedForwardNet(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.conv1 = nn.Conv1d(in_channels = self.config.d_hidn, out_channels = self.config.d_ff, kernel_size=1)
    self.conv2 = nn.Conv1d(in_channels = self.congif.d_ff, out_channels = self.config.d_hidn, kernel_size=1)
    self.active = F.gelu
    self.dropout = nn.Dropout(config.dropout)

  def forward(self, inputs):
    output = self.conv1(inputs.transpose(1, 2))     # 1. Linear를 실행하여 shape을 d_ff(hidden * 4)크기로 키웁니다.
    output = self.active(output)                    # 2. activation 함수(relu or gelu)를 실행합니다.

    output = self.conv2(output).transpose(1, 2)     # 3. Linear를 실행하여 shape을 hidden 크기로 줄입니다.
    output = self.dropout(output)

    return output

4. Encoder

#### Encoder layer

Encoder에서 루프를 돌며 처리할 수 있도록 EncoderLayer를 정의하고 여러 개 만들어서 실행

1. Multi-Head Attention을 수행합니다. Q, K, V 모두 동일한 값을 사용하는 self-attention입니다.
2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행합니다.
3. 2번의 결과를 입력으로 Feed Forward를 실행합니다.
4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.

In [14]:
"""encoder layer"""
class EncoderLayer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.self_attn = MultiHeadAttention(self.config)
    self.layer = norm1 = nn.LayerNorm(self.config.d_hidn, eps = self.config.layer_norm_epsilon)
    self.pos_ffn = PoswiseFeedForwardNet(self.config)
    self.layer_norm2 = nn.LayerNorm(self.config.d_hidn, eps = self.config.layer_norm_epsilon)

  def forward(self, inputs, attn_mask):
    attn_outputs, attn_prob = self.self_attn(inputs, inputs, inputs,attn_mask)    # 1. Multi-Head Attention을 수행합니다. Q, K, V 모두 동일한 값을 사용하는 self-attention입니다.
    attn_outputs = self.layer_norm1(inputs + attn_outputs)                        # 2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행합니다.

    ffn_outputs = self.pos_ffn(attn_outputs)                                      # 3. 2번의 결과를 입력으로 Feed Forward를 실행합니다.
    ffn_outputs = self.layer_norm2(ffn_outputs + attn_outputs)                    # 4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.

    return ffn_outputs, attn_prob

#### Encoder
Encoder 클래스

1. 입력에 대한 Position 값을 구합니다.
2. Input Embedding과 Position Embedding을 구한 후 더합니다.
3. 입력에 대한 attention pad mask를 구합니다.
4. for 루프를 돌며 각 layer를 실행합니다. layer의 입력은 이전 layer의 출력 값입니다.

In [18]:
"""encoder"""
class Encoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.enc_emb = nn.Embedding(self.config.n_enc_vocav, self.config.d_hidn)
    sinusoid_table = torch.FloatTensort(get_sinusoid_encoding_table(self.config.n_enc_seq + 1, self.config.d_hidn))
    self.pos_emb = nn.Embedding.from_pretrained(sinusoid_table, freeze=True)

    self.layers = nn.ModuleList([EncoderLayer(self.config) for _ in range(self, config.n_layer)])

  def forward(self, inputs):
    # 1. 입력에 대한 Position 값을 구합니다.
    positions = torch.arange(inputs.size(1), device = inputs.device, dtype=inputs.dtype)
    pos_mask = inputs.eq(self.config.i_pad)
    positions.masked_fill_(pos_mask, 0)

    outputs = self.enc_emb(inputs) + self.pos_emb(positions)          # 2. Input Embedding과 Position Embedding을 구한 후 더합니다.

    attn_mask = get_attn_pad_mask(inputs, inputs, self.config.i_pad)  # 3. 입력에 대한 attention pad mask를 구합니다.

    attn_probs = []
    for layer in self.layers:
      outputs, attn_prob = layer(outputs, attn_mask)                  # 4. for 루프를 돌며 각 layer를 실행합니다. layer의 입력은 이전 layer의 출력 값입니다.
      attn_probs.append(attn_prob)

    return outputs, attn_probs

5. Decoder

#### Decoder Layer

Decoder에서 루프를 돌며 처리할 수 있도록 DecoderLayer를 정의하고 여러 개 만들어서 실행

1. Multi-Head Attention을 수행합니다.
- Q, K, V 모두 동일한 값을 사용하는 self-attention입니다.
2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행합니다.
3. Encoder-Decoder Multi-Head Attention을 수행합니다.
- Q: 2번의 결과
- K, V: Encoder 결과
4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.
5. 4번의 결과를 입력으로 FeedForward를 실행합니다.
6. 5번의 결과와 4번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.

In [16]:
"""decoder layer"""
class DecoderLayer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.self_attn = MultiHeadAttention(self.config)
    self.layer_norm1 = nn.LayerNorm(self.config.d_hidn, eps = self.config.layer_norm_epsilon)
    self.dec_enc_attn = MultiHeadAttention(self.config)
    self.layer_norm2 = nn.LayerNorm(self.config.d_hidn, eps = self.config.layer_norm_epsilon)
    self.pos_ffn = PoswiseFeedForwardNet(self.config)
    self.layer_norm3 = nn.LayerNorm(self.config.d_hidn, eps = self.config.layer_norm_epsilon)

  def forward(self, dec_inputs, enc_outputs, self_attn_mask, dec_enc_attn_mask):
    self_attn_outputs, self_attn_prob = self.self_attn(dec_inputs, dec_inputs, dec_inputs, self_attn_mask)      # 1. Multi-Head Attention을 수행합니다. - Q, K, V 모두 동일한 값을 사용하는 self-attention입니다.
    self_attn_outputs = self.layer_norm1(dec_inputs + self_attn_outputs)                                        # 2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행합니다.

    # 3. Encoder-Decoder Multi-Head Attention을 수행합니다.
    # - Q: 2번의 결과
    # - K, V: Encoder 결과
    dec_enc_attn_outputs, dec_enc_attn_prob = self.dec_enc_attn(self_attn_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)
    dec_enc_attn_outputs = self.layer_norm2(self_attn_outputs + dec_enc_attn_outputs)                           # 4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.

    ffn_outputs = self.pos_ffn(dec_enc_attn_outputs)                                                            # 5. 4번의 결과를 입력으로 FeedForward를 실행합니다.
    ffn_outputs = self.layer_norm3(dec_enc_attn_outputs + ffn_outputs)                                          # 6. 5번의 결과와 4번의 결과(residual)을 더한 후 LayerNorm을 실행합니다.
    
    return ffn_outputs, self_attn_prob, dec_enc_attn_prob

#### Decoder
Decoder 클래스

1. 입력에 대한 Position 값을 구합니다.
2. Input Embedding과 Position Embedding을 구한 후 더합니다.
3. 입력에 대한 attention pad mask를 구합니다.
4. 입력에 대한 decoder attention mask를 구합니다.
5. attention pad mask와 decoder attention mask 중 1곳이라도 mask 되어 있는 부분이 mask 되도록 attention mask를 구합니다.
6. Q(decoder input), K(encoder output)에 대한 attention mask를 구합니다.
7. for 루프를 돌며 각 layer를 실행합니다.
- layer의 입력은 이전 layer의 출력 값입니다.

In [17]:
"""decoder"""
class Decoder(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.dec_emb = nn.Embedding(self.config.n_dec_vocab, self.config.d_hidn)
    sinusoid_table = torch.FloatTensor(get_sinusoid_encoding_table(self.config.n_dec_seq + 1, self.config.d_hidn))
    self.pos_emb = nn.Embedding.from_pretrained(sinusoid_table, freeze = True)

    self.layers = nn.ModuleList([DecoderLayer(self.config) for _ in range(self.config.n_layer)])

  def forward(self, dec_inputs, enc_inputs, enc_outputs):
    # 1. 입력에 대한 Position 값을 구합니다.
    positions = torch.arnage(dec_inputs.size(1), device = dec_inputs.device, dtype = dec_inputs.dtype).expand(dec_inputs.size(0), dec_inputs.size(1).contiguous()+ 1)
    pos_mask = dec_inputs.eq(self.config.i_pad)
    positions.masked_fill_(pos_mask, 0)

    dec_outputs = self.dec_emb(dec_inputs) + self.pos_emb(positions)                    # 2. Input Embedding과 Position Embedding을 구한 후 더합니다.

    dec_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs, self.config.i_pad)    # 3. 입력에 대한 attention pad mask를 구합니다.
    dec_attn_decoder_mask = get_attn_decoder_mask(dec_inputs)                           # 4. 입력에 대한 decoder attention mask를 구합니다.
    dec_self_attn_mask = torch.gt((dec_attn_pad_mask + dec_attn_decoder_mask), 0)       # 5. attention pad mask와 decoder attention mask 중 1곳이라도 mask 되어 있는 부분이 mask 되도록 attention mask를 구합니다.
    dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs, self.config.i_pad)    # 6. Q(decoder input), K(encoder output)에 대한 attention mask를 구합니다.

    self_attn_probs, dec_enc_attn_probs = [], []
    # 7. for 루프를 돌며 각 layer를 실행합니다.
    # - layer의 입력은 이전 layer의 출력 값입니다.
    for layer in self.layers:
      dec_outputs, self_attn_prob, dec_enc_attn_prob = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
      self_attn_probs.append(self_attn_prob)
      dec_enc_attn_probs.append(dec_enc_attn_prob)
    return dec_outputs, self_attn_probs, dec_enc_attn_probs

6. Transformer

Transformer 클래스

1. Encoder Input을 입력으로 Encoder를 실행합니다.
2. Encoder Output과 Decoder Input을 입력으로 Decoder를 실행합니다.

In [19]:
"""transformer"""
class Transformer(nn.Module):
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.encoder = Encoder(self.config)
    self.decoder = Decoder(self.config)

  def forward(self, enc_inputs, dec_inputs):
    enc_outputs, enc_self_attn_probs = self.encoder(enc_inputs)                                                 # 1. Encoder Input을 입력으로 Encoder를 실행합니다.
    dec_outputs, dec_self_attn_probs, dec_enc_attn_probs = self.decoder(dec_inputs, enc_inputs, enc_outputs)    # 2. Encoder Output과 Decoder Input을 입력으로 Decoder를 실행합니다.
    return dec_outputs, enc_self_attn_probs, dec_self_attn_probs, dec_enc_attn_probs