In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# google sentencepiece 설치
!pip install sentencepiece

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.1.97


In [5]:
##### pytorch #####
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


##### 시각화 #####
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns 

##### 기본 모듈 #####
import pandas as pd
import numpy as np
import os
import random
import json
import math
import easydict
from pprint import pprint
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

##### 디버깅 #####
import pdb

##### cuda #####
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') # GPU 할당

##### 경고무시 #####
import warnings
warnings.filterwarnings(action='ignore')

import sentencepiece as spm

In [6]:
vocab_file = "/content/drive/MyDrive/2.Study/GPT/Data/kowiki.model"
vocab = spm.SentencePieceProcessor()
vocab.load(vocab_file)

True

In [10]:
config = easydict.EasyDict({
    "n_dec_vocab": len(vocab),
    "n_dec_seq": 256,
    "n_layer": 6,
    "d_hidn": 256,
    "i_pad": 0,
    "d_ff": 1024,
    "n_head": 4,
    "d_head": 64,
    "dropout": 0.1,
    "layer_norm_epsilon": 1e-12
                })

---
# Multi-head Attention
---

- 주로 텍스트의 문맥 정보를 파악하기 위한 목적으로 사용

- 입력된 문장을 여러개의 헤드로 나누고, 각 헤드가 문맥 정보를 학습

- 이후 여러개의 헤드의 출력값을 연결(concatenate)하여 최종 출력값을 만들게 된다.

- 위 과정을 통해 문장 내 단어들 사이의 의미론적 관계를 학습하고, 문장의 전체적인 의미를 파악할 수 있게 된다.

- 현재 생성하려는 단어와 이전 단어들간의 의미론적 관계를 파악하여 다음 단어를 예측


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_hidn, n_head, d_head):
        super().__init__()
        self.d_hidn = d_hidn
        self.n_head = n_head
        self.d_head = d_head

        self.W_Q = nn.Linear(d_hidn, n_head * d_head)
        self.W_K = nn.Linear(d_hidn, n_head * d_head)
        self.W_V = nn.Linear(d_hidn, n_head * d_head)
        '''입력으로 들어온 차원(d_hidn)을 
           헤드의 개수(n_head)와 헤드의 차원(d_head)으로 나누어 가중치 행렬을 생성 '''

        self.scaled_dot_attn = ScaledDotProductAttention(d_head)
        self.linear = nn.Linear(n_head * d_head, d_hidn)
    
    def forward(self, Q, K, V, attn_mask):
        batch_size = Q.size(0)
        
        q_s = self.W_Q(Q).view(batch_size, -1, self.n_head, self.d_head).transpose(1,2) # (bs, n_head, q_seq_len, d_head)
        
        k_s = self.W_K(K).view(batch_size, -1, self.n_head, self.d_head).transpose(1,2) # (bs, n_head, k_seq_len, d_head)
        
        v_s = self.W_V(V).view(batch_size, -1, self.n_head, self.d_head).transpose(1,2) # (bs, n_head, v_seq_len, d_head)

        
        attn_mask = attn_mask.unsqueeze(1).repeat(1, self.n_head, 1, 1) # (bs, n_head, q_seq_len, k_seq_len)

        
        context, attn_prob = self.scaled_dot_attn(q_s, k_s, v_s, attn_mask)
        '''context = Attention score를 기반으로 하는 v_s의 가중 합 (bs, n_head, q_seq_len, d_head)
           attn_prob = 시퀀스에 대한 Attention Distribution (bs, n_head, q_seq_len, k_seq_len)'''
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.n_head * self.d_head) # (bs, n_head, q_seq_len, h_head * d_head)
        
        output = self.linear(context) # (bs, n_head, q_seq_len, e_embd)
        
        return output, attn_prob # (bs, q_seq_len, d_hidn), (bs, n_head, q_seq_len, k_seq_len)

---
# Scaleed Dot Product Attention
----

- Query, Key, Value 3가지 벡터를 사용하여 주어진 문장을 이해하는데 사용

- Query 
  - 각 단어가 다른 단어들과 어떤 관계를 맺고 있는지 나타내는 행렬

  - 행렬을 구성하기 위해, 입력 seq의 각 단어의 임베딩 벡터를 입력으로 받고  
    이를 선형 변환한 결과를 다시 하나의 벡터로 변환

  - 이 벡터는 각 단어의 의미적인 정보를 담고 있는 행렬의 열로 사용된다.

- Key 

  - 각 단어가 다른 단어들과 어떤 유사성을 가지고 있는지를 나타내는 행렬

- Value 

  - 입력 seq의 각 단어에 대한 정보를 나태나는 행렬

  - value 값은 Multi-head Attetntion에서 각 당너의 임베딩 벡터를 매핑하는데 사용


Query값과 Key값의 내적 결과를 Softmax함수를 통해 정규화하여 각 단어에 대한 가중치를 구하고

가중치를 이용해 value값을 Weighted Sum하여 Attention Context Vector를 계산

이를 통해 입력 seq에서 각 단어에 대한 정보를 인코딩하는데 사용

주어진 Query에 대해 가장 관련성이 높은 key-value 쌍을 선택하여 이를 통해 다음 단어를 예측하거나 문장을 생성

In [None]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_head):
        super().__init__()
        self.scale = 1 / (d_head ** 0.5)
    
    def forward(self, Q, K, V, attn_mask):
        
        scores = torch.matmul(Q, K.transpose(-1, -2)).mul_(self.scale) # (bs, n_head, q_seq_len, k_seq_len)
        scores.masked_fill_(attn_mask, -1e9)
        
        attn_prob = nn.Softmax(dim=-1)(scores) # (bs, n_head, q_seq_len, k_seq_len)
        
        context = torch.matmul(attn_prob, V) # (bs, n_head, q_seq_len, d_v)
        
        return context, attn_prob # (bs, n_head, q_seq_len, d_v), (bs, n_head, q_seq_len, v_seq_len)

---
# Position-wise Feed Forward Net
---

In [None]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, d_hidn):
        super().__init__()

        self.conv1 = nn.Conv1d(in_channels=self.config.d_hidn, out_channels=self.config.d_hidn * 4, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=self.config.d_hidn * 4, out_channels=self.config.d_hidn, kernel_size=1)
        self.active = F.gelu # 비선형성을 추가하고 모델의 표현력을 증가

    def forward(self, inputs):
        
        output = self.active(self.conv1(inputs.transpose(1, 2))) # (bs, d_ff, n_seq)
        
        output = self.conv2(output).transpose(1, 2) # (bs, n_seq, d_hidn)
        
        return output # (bs, n_seq, d_hidn)

---
# ETC
---

In [None]:
# Position Encoding값을 구하는 함수 

def get_sinusoid_encoding_table(n_seq, d_hidn):
    
    # 입력 위치와 임베딩 차원에 따라 해당 위치의 각도값을 계산
    def cal_angle(position, i_hidn):
        return position / np.power(10000, 2 * (i_hidn // 2) / d_hidn)
    
    # 입력 위치에 따라 모든 임베딩 차원의 각도값 계산 
    def get_posi_angle_vec(position):
        return [cal_angle(position, i_hidn) for i_hidn in range(d_hidn)]

    # seq길이와 임베딩 차원 수에 따라 시퀀스 길이 만큼 반복해서 생성된 값들을 저장
    sinusoid_table = np.array([get_posi_angle_vec(i_seq) for i_seq in range(n_seq)])
    
    # 짝수 인덱스의 값들은 sin함수로 계산하고 홀수는 cos함수로 계산
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # even index sin 
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # odd index cos
    '''sin, cos 함수는 시계열, 주기성이 있는 데이처를 처리할 때 사용되며 
       생성된 값들은 특정한 주기를 가지게 되며 이를 통해 입력 seq내의 토큰 위치를 구분할 수 있게 됨
       
       즉, sin, cos를 사용해 위치 정보를 부여함으로써 
       입력 seq 토큰 위치에 대한 정보를 학습하고 이를 활용해 문맥 정보를 파악할 수 있게 됨'''
    return sinusoid_table

<img src='https://paul-hyun.github.io/assets/2019-12-19/pad_mask.png'>

In [None]:
# padding이 적용된 위치에 대한 mask를 생성하는 함수 
def get_attn_pad_mask(seq_q, seq_k, i_pad):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()

    # i_pad와 일치하는 부분을 True, 아니면 False
    pad_attn_mask = seq_k.data.eq(i_pad)

    # seq_k와 같은 길이로 len_q까지 확장
    pad_attn_mask= pad_attn_mask.unsqueeze(1).expand(batch_size, len_q, len_k)
    return pad_attn_mask

<img src='https://paul-hyun.github.io/assets/2019-12-19/decoder_mask.png'>

In [None]:
# attention decoder mask 
def get_attn_decoder_mask(seq):

    # Decoder의 입력 시퀀스와 동일한 크기의 tensor를 생성하고 모든 요소를 1로 채움
    subsequent_mask = torch.ones_like(seq).unsqueeze(-1).expand(seq.size(0), seq.size(1), seq.size(1))
    
    # 대각선 기준 위쪽 삼각형만 남기고 아래쪽 삼각형을 0으로 만듬
    subsequent_mask = subsequent_mask.triu(diagonal=1) # upper triangular part of a matrix(2-D)
    
    # Decoder의 입력 시퀀스를 대각선 기준으로 위쪽 삼각형 = 0, 아래쪽 삼각형 = 1로 채운 이진 행렬 
    return subsequent_mask

----
# Decoder Layer
----

<img src='https://paul-hyun.github.io/assets/2019-12-30/decoder.png'>

In [None]:
class DecoderLayer(nn.Module):
  
  def __init__(self, config):
    super().__init__()
    self.config = config

    self.self_attn = MultiHeadAttention(self.config)
    self.layer_norm1 = nn.LayerNorm(self.config.d_hidn, eps=self.config.layer_norm_epsilon)
    '''입력 tensor의 마지막 차원을 기준으로 계산된 평균과 표준편차를 이용하여 정규화된 tensor를 출력
       학습 안정화 / Gradient vanishing / 일반화 성능향상에 장점이존재'''
       
    self.pos_ffn = PoswiseFeedForwardNet(self.config)
    self.layer_norm3 = nn.LayerNorm(self.config.d_hidn, eps=self.config.layer_norm_epsilon)
  
  def forward(self, dec_inputs, self_attn_mask):
      
    self_att_outputs, self_attn_prob = self.self_attn(dec_inputs, dec_inputs, dec_inputs, self_attn_mask)
    # (bs, n_dec_seq, d_hidn), (bs, n_head, n_dec_seq, n_dec_seq)

    self_att_outputs = self.layer_norm1(dec_inputs + self_att_outputs)
    
    
    ffn_outputs = self.pos_ffn(self_att_outputs) # (bs, n_dec_seq, d_hidn)
    ffn_outputs = self.layer_norm3(self_att_outputs + ffn_outputs)
    
    return ffn_outputs, self_attn_prob    
    # (bs, n_dec_seq, d_hidn), (bs, n_head, n_dec_seq, n_dec_seq), (bs, n_head, n_dec_seq, n_enc_seq)

---
# Decoder
---

In [None]:
class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.dec_emb = nn.Embedding(self.config.n_dec_vocab, self.config.d_hidn)
        sinusoid_table = torch.FloatTensor(get_sinusoid_encoding_table(self.config.n_dec_seq + 1, self.config.d_hidn))
        self.pos_emb = nn.Embedding.from_pretrained(sinusoid_table, freeze=True)

        self.layers = nn.ModuleList([DecoderLayer(self.config) for _ in range(self.config.n_layer)])
    
    def forward(self, dec_inputs):
        positions = torch.arange(dec_inputs.size(1), device=dec_inputs.device, dtype=dec_inputs.dtype).expand(dec_inputs.size(0), dec_inputs.size(1)).contiguous() + 1
        pos_mask = dec_inputs.eq(self.config.i_pad)
        positions.masked_fill_(pos_mask, 0)
    
        # (bs, n_dec_seq, d_hidn)
        dec_outputs = self.dec_emb(dec_inputs) + self.pos_emb(positions)


        dec_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs, self.config.i_pad)

        dec_attn_decoder_mask = get_attn_decoder_mask(dec_inputs)

        dec_self_attn_mask = torch.gt((dec_attn_pad_mask + dec_attn_decoder_mask), 0)

        self_attn_probs = []
        for layer in self.layers:
           
            dec_outputs, self_attn_prob = layer(dec_outputs, dec_self_attn_mask)
            self_attn_probs.append(self_attn_prob)

        return dec_outputs, self_attn_probs