In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

import math
import numpy as np
import pandas as pd
import random
import re

from torch.utils.data import DataLoader, Dataset

### 기본 Transformer 구조

In [None]:
class Transformer(nn.Module):
  def __init__(self, num_tokens, dim_model, num_heads, num_encoder_layers, num_decoder_layers, dropout_p):
    super.__init__()

    self.transformer = nn.Transformer(
        d_model = dim_model,
        nhead = num_heads,
        num_encoder_layers = num_encoder_layers,
        num_decoder_layers = num_decoder_layers,
        dropout = dropout_p
    )

  def forward(self):
    pass



### Positional Encoding

In [45]:
class PositionalEncoding(nn.Module):
  def __init__(self, dim_model, dropout_p, max_len):
    super().__init__()
    self.dropout = nn.Dropout(dropout_p)
    
    # Encoding - Form formula
    pos_encoding = torch.zeros(max_len, dim_model)
    positions_list = torch.arange(0, max_len, dtype = torch.float).view(-1,1) # position은 홀 짝 번갈아 나온다.
    division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.))/dim_model) # div term은 짝수로 고정이다. 

    pos_encoding[:, 0::2] = torch.sin(positions_list * division_term) # 짝수의 경우에는 sin
    pos_encoding[:, 1::2] = torch.cos(positions_list * division_term) # 짝수의 경우에는 cos

    # Saving Buffer
    pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1) # flatten후 행벡터로 변환
    self.register_buffer("pos_encoding", pos_encoding)

    # register_buffer 로 layer를 등록하면 어떤 특징이 있는가?
    # 1. optimizer가 업데이트하지 않는다.
    # 2. 그러나 값은 존재한다(하나의 layer로써 작용한다고 보면 된다.)
    # 3. state_dict()로 확인이 가능하다.
    # 4. GPU연산이 가능하다.
    #따라서 네트워크를 구성함에 있어서 네트워크를 end2end로 학습시키고 싶은데 중간에 업데이트를 하지않는 일반 layer를 넣고 싶을 때 사용할 수 있다.

  def forward(self, token_embedding : torch.tensor) -> torch.tensor: # ->??
    return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0),:] ) # nn.Dropout의 연산은 20%를 0으로 만들어 버린다. 
 



In [73]:
class Transformer(nn.Module):
  def __init__(self, num_tokens, dim_model, num_heads, num_encoder_layers, num_decoder_layers, dropout_p):
    super().__init__()
    self.model_type = "Transformer"
    self.dim_model = dim_model
    self.positional_encoder = PositionalEncoding(dim_model = dim_model, dropout_p = dropout_p, max_len = 5000) # 학습 불가능 한 pos encoding 층
    self.embedding = nn.Embedding(num_tokens, dim_model)
    self.transformer = nn.Transformer(
        d_model = dim_model,
        nhead = num_heads,
        num_encoder_layers = num_encoder_layers,
        num_decoder_layers = num_decoder_layers,
        dropout = dropout_p
    )
    self.out = nn.Linear(dim_model, num_tokens)

  def forward(self, src, tgt, tgt_mask = None, src_pad_mask = None, tgt_pad_mask = None):
    # src, tgt 의 size는 반드시 (batch_size, src squence length)이어야 한다.
    # Embedding + pos encoding - out size = (batch_size, sequence length, dim_model)

    src = self.embedding(src) * math.sqrt(self.dim_model)
    tgt = self.embedding(tgt) * math.sqrt(self.dim_model)
    src = self.positional_encoder(src)
    tgt = self.positional_encoder(tgt)

    src = src.permute(1,0,2)
    tgt = tgt.permute(1,0,2)

    transformer_out = self.transformer(src, tgt, tgt_mask = tgt_mask, src_key_padding_mask = src_pad_mask, tgt_key_padding_mask = tgt_pad_mask)
    out = self.out(transformer_out)
    return out

  def get_tgt_mask(self, size):
    tgt_mask = torch.tril(torch.ones(size,size) == 1)
    tgt_mask = tgt_mask.float()
    tgt_mask = tgt_mask.masked_fill(tgt_mask == 0 , float('-inf')) # 무한대를 정의하는 방법이구나. 
    tgt_mask = tgt_mask.masked_fill(tgt_mask == 0 , float(0))
    return tgt_mask

  def creat_pad_mask(self, matrix : torch.tensor, pad_token : int) -> torch.tensor :
    return (matrix == pad_token)

In [74]:
def generate_random_data(n):
    SOS_token = np.array([2])
    EOS_token = np.array([3])
    length = 8

    data = []

    # 1,1,1,1,1,1 -> 1,1,1,1,1
    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.ones(length), EOS_token))
        y = np.concatenate((SOS_token, np.ones(length), EOS_token))
        data.append([X, y])

    # 0,0,0,0 -> 0,0,0,0
    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        y = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        data.append([X, y])

    # 1,0,1,0 -> 1,0,1,0,1
    for i in range(n // 3):
        X = np.zeros(length)
        start = random.randint(0, 1)

        X[start::2] = 1

        y = np.zeros(length)
        if X[-1] == 0:
            y[::2] = 1
        else:
            y[1::2] = 1

        X = np.concatenate((SOS_token, X, EOS_token))
        y = np.concatenate((SOS_token, y, EOS_token))
        data.append([X, y])

    np.random.shuffle(data)

    return data

In [75]:
def batchify_data(data, batch_size=16, padding=False, padding_token=-1):
    batches = []
    for idx in range(0, len(data), batch_size):
        # batch_size 크기가 아닌 경우 마지막 비트를 얻지 않도록 합니다.
        if idx + batch_size < len(data):
            # 여기서 배치의 최대 길이를 가져와 PAD 토큰으로 길이를 정규화해야 합니다.
            if padding:
                max_batch_length = 0
                # batch에서 가장 긴 문장 가져오기
                for seq in data[idx : idx + batch_size]:
                    if len(seq) > max_batch_length:
                        max_batch_length = len(seq)

                # 최대 길이에 도달할 때까지 X 패딩 토큰을 추가합니다.
                for seq_idx in range(batch_size):
                    remaining_length = max_bath_length - len(data[idx + seq_idx])
                    data[idx + seq_idx] += [padding_token] * remaining_length

            batches.append(np.array(data[idx : idx + batch_size]).astype(np.int64))

    print(f"{len(batches)} batches of size {batch_size}")

    return batches


train_data = generate_random_data(9000)
val_data = generate_random_data(3000)

train_dataloader = batchify_data(train_data)
val_dataloader = batchify_data(val_data)

562 batches of size 16
187 batches of size 16


In [76]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(num_tokens=4, dim_model=8, num_heads=2, num_encoder_layers=3, num_decoder_layers=3, dropout_p=0.1).to(device)
opt = torch.optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()

### 모델 학습

In [79]:
def train_loop(model, opt, loss_fn, dataloader):
  model.train()
  total_loss = 0

  for batch in dataloader:
    X,y = batch[:, 0], batch[:, 1]
    X,y = torch.tensor(X).to(device), torch.tensor(y).to(device)

    y_input = y[:,:-1] # SOS 부터 다 넣는다.
    y_expected = y[:, 1:] # EOS를 빼고 예측한다.

    sequence_length = y_input.size(1)
    tgt_mask = model.get_tgt_mask(sequence_length).to(device) # 아까 클래스에 구현해 둔 mask 생성함수

    pred = model(X, y_input, tgt_mask) 
    pred = pred.permute(1,2,0)
    loss = loss_fn(pred, y_expected)

    opt.zero_grad()
    loss.backward()
    opt.step()
    total_loss += loss.detach().item()

  return total_loss / len(dataloader)

def validation_loop(model, loss_fn, dataloader):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for batch in dataloader:
            X, y = batch[:, 0], batch[:, 1]
            X, y = torch.tensor(X, dtype=torch.long, device=device), torch.tensor(y, dtype=torch.long, device=device)

            y_input = y[:,:-1]
            y_expected = y[:,1:]

            sequence_length = y_input.size(1)
            tgt_mask = model.get_tgt_mask(sequence_length).to(device)

            pred = model(X, y_input, tgt_mask)

            pred = pred.permute(1, 2, 0)      
            loss = loss_fn(pred, y_expected)
            total_loss += loss.detach().item()

    return total_loss / len(dataloader)

In [81]:
def fit(model, opt, loss_fn, train_dataloader, val_dataloader, epochs):  
    train_loss_list, validation_loss_list = [], []

    print("Training and validating model")
    for epoch in range(epochs):
        print("-"*25, f"Epoch {epoch + 1}","-"*25)

        train_loss = train_loop(model, opt, loss_fn, train_dataloader)
        train_loss_list += [train_loss]

        validation_loss = validation_loop(model, loss_fn, val_dataloader)
        validation_loss_list += [validation_loss]

        print(f"Training loss: {train_loss:.4f}")
        print(f"Validation loss: {validation_loss:.4f}")
        print()

    return train_loss_list, validation_loss_list\

In [82]:
train_loss_list, validation_loss_list = fit(model, opt, loss_fn, train_dataloader, val_dataloader, 10)

Training and validating model
------------------------- Epoch 1 -------------------------
Training loss: 0.3896
Validation loss: 0.3410

------------------------- Epoch 2 -------------------------
Training loss: 0.3582
Validation loss: 0.2987

------------------------- Epoch 3 -------------------------
Training loss: 0.3227
Validation loss: 0.2634

------------------------- Epoch 4 -------------------------
Training loss: 0.2982
Validation loss: 0.2319

------------------------- Epoch 5 -------------------------
Training loss: 0.2784
Validation loss: 0.2145

------------------------- Epoch 6 -------------------------
Training loss: 0.2676
Validation loss: 0.2033

------------------------- Epoch 7 -------------------------
Training loss: 0.2563
Validation loss: 0.1936

------------------------- Epoch 8 -------------------------
Training loss: 0.2490
Validation loss: 0.1848

------------------------- Epoch 9 -------------------------
Training loss: 0.2417
Validation loss: 0.1741

-------

In [83]:
def predict(model, input_sequence, max_length=15, SOS_token=2, EOS_token=3):
    model.eval()

    y_input = torch.tensor([[SOS_token]], dtype=torch.long, device=device)

    num_tokens = len(input_sequence[0])

    for _ in range(max_length):
        # Get source mask
        tgt_mask = model.get_tgt_mask(y_input.size(1)).to(device)

        pred = model(input_sequence, y_input, tgt_mask)

        next_item = pred.topk(1)[1].view(-1)[-1].item() # num with highest probability
        next_item = torch.tensor([[next_item]], device=device)

        # Concatenate previous input with predicted best word
        y_input = torch.cat((y_input, next_item), dim=1)

        # Stop if model predicts end of sentence
        if next_item.view(-1).item() == EOS_token:
            break

    return y_input.view(-1).tolist()


# Here we test some examples to observe how the model predicts
examples = [
    torch.tensor([[2, 0, 0, 0, 0, 0, 0, 0, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 1, 1, 1, 1, 1, 1, 1, 1, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 1, 0, 1, 0, 1, 0, 1, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 0, 1, 0, 1, 0, 1, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 3]], dtype=torch.long, device=device)
]

for idx, example in enumerate(examples):
    result = predict(model, example)
    print(f"Example {idx}")
    print(f"Input: {example.view(-1).tolist()[1:-1]}")
    print(f"Continuation: {result[1:-1]}")
    print()

Example 0
Input: [0, 0, 0, 0, 0, 0, 0, 0]
Continuation: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Example 1
Input: [1, 1, 1, 1, 1, 1, 1, 1]
Continuation: [1, 1, 1, 1, 1, 1, 1, 1, 1]

Example 2
Input: [1, 0, 1, 0, 1, 0, 1, 0]
Continuation: [1, 0, 1, 0, 1, 0, 1, 0]

Example 3
Input: [0, 1, 0, 1, 0, 1, 0, 1]
Continuation: [1, 0, 1, 0, 1, 0, 1, 0]

Example 4
Input: [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]
Continuation: [0, 1, 0, 1, 0, 1, 0, 1, 0]

Example 5
Input: [0, 1]
Continuation: [1, 0, 1, 0, 1, 0, 1, 0, 1]

