In [1]:
import math
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder , TransformerEncoderLayer

import torchtext
from torchtext.data.utils import get_tokenizer

In [348]:
class Transformer(nn.Module) : # 모델 정의
    '''
    num_token : 토큰 갯수 > 단어 임베딩을 위함
    num_inputs : embedding 차원 수
    num_heads : 멀티헤드 수
    num_hidden : layer 차원수
    num_layers : layer 수
    dropout = dropout 비율 defalut:0.3 
    '''
    def __init__(self, num_token, num_inputs, num_heads, num_hidden, num_layers, dropout = 0.3):
        super(Transformer, self).__init__()
        #======================= 초기설정 =============================
        self.model_name = "transformer"
        self.mask_source = None
        self.position_enc = PosEnc(num_inputs, dropout) # PosEnd -> functional
        
        layers_enc = TransformerEncoderLayer(num_inputs,num_heads,num_hidden, dropout) # encoder block
        self.enc_transformer = TransformerEncoder(layers_enc, num_layers) # encoder block를 n개 쌓겠다
        self.enc = nn.Embedding(num_token, num_inputs) # word to embedding layer
        self.num_inputs = num_inputs
        self.dec = nn.Linear(num_inputs, num_token)
        self.init_params() # init_params() -> functional
    
    def _gen_sqr_nxt_mask(self, size):
        ''' 
        torch.triu : 대각행렬 기준 위쪽만 1로 채움
            ex) in : torch.triu(torch.ones(5,5))
                out : tensor([[1., 1., 1., 1., 1.],
                              [0., 1., 1., 1., 1.],
                              [0., 0., 1., 1., 1.],
                              [0., 0., 0., 1., 1.],
                              [0., 0., 0., 0., 1.]])
                              
                in : (torch.triu(torch.ones(size,size)) == 1).transpose(0,1) 
                out : tensor([[ True, False, False, False, False],
                              [ True,  True, False, False, False],
                              [ True,  True,  True, False, False],
                              [ True,  True,  True,  True, False],
                              [ True,  True,  True,  True,  True]])    
                
        '''
        msk = (torch.triu(torch.ones(size,size)) == 1).transpose(0,1) # mask 안할 위치 지정
        msk = msk.float().masked_fill(msk==0, float("-inf")) # mask 할 위치 -inf
        msk = msk.masked_fill(msk == 1, float(0.0)) # mask 안할 위치 0으로 변경
        return msk
    
    def init_params(self):
        ''' 
        파라미터 초기화 세팅 function
        '''
        initial_rng = 0.12
        self.enc.weight.data.uniform_(-initial_rng , initial_rng) # -initial_rng ~ initial_rng 사이 실수
        self.dec.bias.data.zero_() # bias 0
        self.dec.weight.data.uniform_(-initial_rng , initial_rng)
        
    def forward(self, source , src_mask = None):
        '''
        source : 단어 list
        mask_source : mask의 형태(특정 마스크로 지정가능)
        '''
        if src_mask is None or src_mask.size(0) != len(source): # mask_source가 지정되지 않으면
            dvc = source.device # 입력데이터는 어디에 적재되어있는가 "CPU"? or "GPU"
            msk = self._gen_sqr_nxt_mask(len(source)).to(dvc) # mask 배열 생성 
            self.mask_source = msk  
            
        source = self.enc(source) * math.sqrt(self.num_inputs) # self.enc : embedding layer / out > source embedding 
                                                               # math.sqrt : 값 정규화
        source = self.position_enc(source) # positional encoding
        
        op = self.enc_transformer(source, self.mask_source)
        op = self.dec(op) # 단순 linear를 이용하여 decode
        return op
class PosEnc(nn.Module):
    '''
    Positional Encoding 
    '''
    def __init__(self, d_m, dropout = 0.2 , size_limit = 5000):
        super(PosEnc,self).__init__()
        self.dropout = nn.Dropout(dropout)
        
        p_enc = torch.zeros(size_limit, d_m) # size_limit : vocab length, d_m : embedding dim 
                                             # 2차원 array
        
        pos = torch.arange(0, size_limit,dtype = torch.float).unsqueeze(1) # [size_limit,] > [size_limit,1]
        
        divider = torch.exp(torch.arange(0,d_m,2).float() * (-math.log(10000.0) / d_m))
        
        p_enc[:,0::2] = torch.sin(pos * divider)
        p_enc[:,1::2] = torch.cos(pos * divider) # [size_limit,d_m] 
        p_enc = p_enc.unsqueeze(0).transpose(0,1) # [size_limit,d_m] > [1,size_limit,d_m] > [size_limit,1,d_m]
        
        self.register_buffer("p_enc",p_enc)
    
    def forward(self,x):
        return self.dropout(x + self.p_enc[:x.size(0),:])

In [349]:
get_tokenizer("basic_english")

<function torchtext.data.utils._basic_english_normalize(line)>

In [350]:
# https://tutorials.pytorch.kr/beginner/transformer_tutorial.html
# 위 링크를 참고하였습니다.
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
 
# 1. vocab 생성

tokenizer = get_tokenizer("basic_english")

def yield_tokens(data_iter):
    for text in data_iter : # (label, text) 
        yield tokenizer(text)

training_text, validation_text, testing_text = torchtext.datasets.WikiText2() # train, val, test 제공

# training_text를 이용하여 vocab 생성
vocab = build_vocab_from_iterator(yield_tokens(training_text),
                                  specials= ["<sos>","<eos>","<unk>"]) # vocab생성
vocab.set_default_index(vocab["<unk>"]) # 없는 단어 처리

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2. sentence to number (vocab)

def data_process(raw_text_iter):
    # 텍스트를 하나씩 넣어 숫자로 변환
    data = [ torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]    
    # list.numel() > 원소 개수
    # 원소가 하나도 없음 : 텅빈 문장은 제외 -> 모든 문장의 길이 [n,] 형식으로 저장
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data))) 

train_data = data_process(training_text) 
val_data = data_process(validation_text)
test_data = data_process(testing_text)

def batchify(data , bsz): # 지정한 최대길이(batch_size)만큼 자름
    '''
    배치크기 나누기
    '''
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz,seq_len).t().contiguous()
    return data.to(device)

batch_size = 20 
eval_batch_size = 10
train_data = batchify(train_data, batch_size) # 문장별로 넣는게 아니라, 모든 단어를 일차원으로 나열하여 단순하게 자름
val_data = batchify(val_data, eval_batch_size) 
test_data = batchify(test_data, eval_batch_size) 

In [351]:
bptt = 35
def get_batch(source, i ):
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]

    target = source[i+1:i+1+seq_len].reshape(-1)
    return data,target

In [352]:
ntokens = len(vocab)
emsize = 100 # embedding size
d_hid = 100 # hidden dim
nlayers = 2 
nhead = 2
dropout = 0.2
model = Transformer(ntokens,emsize,nhead,d_hid,nlayers,dropout).to(device)

In [353]:
import copy
import time

criterion = nn.CrossEntropyLoss()

lr = 5.0
optimizer = torch.optim.SGD(model.parameters(), lr= lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma = 0.95)
def generate_square_subsequent_mask(sz):
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)


def train(model):
    model.train()
    total_loss = 0
    log_interval = 200
    start_time = time.time()
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    
    num_batches = len(train_data) // bptt
    
    for batch, i in enumerate(range(0, train_data.size(0) - 1 ,bptt)):
        data, targets = get_batch(train_data,i)
        seq_len = data.size(0)
        if seq_len != bptt: # 마지막 부분에 설정한 부분보다 적은 수의 단어가 들어가있는 경우가 있기에
            src_mask = src_mask[:seq_len, :seq_len]

        output = model(data,src_mask)
        
        loss = criterion(output.view(-1,ntokens) , targets)
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # 기울기 폭발 방지
        optimizer.step()
        
        total_loss += loss.item()
        if batch % log_interval == 0 and batch> 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data):
    model.eval()  # 평가 모드 시작
    total_loss = 0.
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            seq_len = data.size(0)
            if seq_len != bptt:
                src_mask = src_mask[:seq_len, :seq_len]
            output = model(data, src_mask)
            output_flat = output.view(-1, ntokens)
            total_loss += seq_len * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)
        
            

In [354]:
best_val_loss = float('inf')
epochs = 3
best_model = None

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(model)
    val_loss = evaluate(model, val_data)
    val_ppl = math.exp(val_loss)
    elapsed = time.time() - epoch_start_time
    print('-' * 89)
    print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
          f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
    print('-' * 89)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = copy.deepcopy(model)

    scheduler.step()

| epoch   1 |   200/ 2928 batches | lr 5.00 | ms/batch 10.17 | loss  8.14 | ppl  3422.65
| epoch   1 |   400/ 2928 batches | lr 5.00 | ms/batch 11.14 | loss  6.75 | ppl   851.33
| epoch   1 |   600/ 2928 batches | lr 5.00 | ms/batch 10.82 | loss  5.96 | ppl   385.79
| epoch   1 |   800/ 2928 batches | lr 5.00 | ms/batch 11.45 | loss  5.74 | ppl   311.77
| epoch   1 |  1000/ 2928 batches | lr 5.00 | ms/batch 11.56 | loss  5.63 | ppl   278.73
| epoch   1 |  1200/ 2928 batches | lr 5.00 | ms/batch 11.17 | loss  5.60 | ppl   270.33
| epoch   1 |  1400/ 2928 batches | lr 5.00 | ms/batch 11.01 | loss  5.52 | ppl   250.22
| epoch   1 |  1600/ 2928 batches | lr 5.00 | ms/batch 11.42 | loss  5.38 | ppl   217.61
| epoch   1 |  1800/ 2928 batches | lr 5.00 | ms/batch 11.49 | loss  4.89 | ppl   132.84
| epoch   1 |  2000/ 2928 batches | lr 5.00 | ms/batch 11.55 | loss  4.61 | ppl   100.46
| epoch   1 |  2200/ 2928 batches | lr 5.00 | ms/batch 10.78 | loss  4.38 | ppl    80.23
| epoch   1 |  2400/ 

In [355]:
test_loss = evaluate(best_model, test_data)
test_ppl = math.exp(test_loss)
print('=' * 89)
print(f'| End of training | test loss {test_loss:5.2f} | '
      f'test ppl {test_ppl:8.2f}')
print('=' * 89)

| End of training | test loss  1.32 | test ppl     3.75
