# [프로젝트5] 문의 유형 분류를 위해 Transformer 모델 적용하기

---


## 프로젝트 목표
---
- Transformer Encoder 모델 구성
- Transformer Decoder 모델 구성
- Transformer Encoder를 활용한 분류 모델 학습 및 분석 


## 프로젝트 목차
---

1. **Transformer Encoder 모델 구성:** Transformer Encoder 모델을 구성합니다.

2. **Transformer Decoder 모델 구성:** Transformer Decoder 모델을 구성합니다.

3. **Transformer Encoder를 활용한 분류 모델 학습 및 분석 :** Transformer Encoder를 활용하여 문의 유형 분류 모델을 만들고 학습합니다.


## 프로젝트 개요
---

프로젝트 4에서 LSTM 모델을 구성하고 Attention 모듈을 추가하여 학습하여 분류 모델을 만들었습니다. 이번 프로젝트에서는 self-attention이 핵심 모듈인 transformer를 구성하여 봅니다. Transformer Encoder를 활용하여 분류 모델 학습을 진행합니다.

## 1. 데이터 전처리

---

### 1.1. 라이브러리 및 데이터 불러오기


프로젝트 1에서 사용한 데이터와 모델 학습을 위해 필요한 라이브러리를 불러옵니다. 

In [1]:
!pip install torch torchtext==0.11.0

You should consider upgrading via the '/opt/conda/bin/python3.8 -m pip install --upgrade pip' command.[0m


In [2]:
import pandas as pd
import re
from konlpy.tag import Okt

import random
import numpy as np
import torch
import torchtext

In [3]:
data = pd.read_csv('./01_data.csv', encoding='cp949')
texts = data['메모'].tolist() # 자연어 데이터를 리스트 형식으로 변환합니다
label_list = data['상담유형3_GT'].unique().tolist()
labels = data['상담유형3_GT'].tolist()

In [4]:
def cleaning(text):
    # 정제: 한글, 공백 제외한 문자 제거
    text = re.sub('[^가-힣ㄱ-ㅎㅏ-ㅣ\\s]', '', text)
    return text

In [5]:
texts_clean = []
for i in range(len(texts)):
    text_clean = cleaning(texts[i])
    texts_clean.append(text_clean)

학습 데이터와 테스트 데이터를 구분합니다.

In [6]:
num_train = int(0.8*len(texts_clean))

texts_labels = list(zip(texts_clean,labels))
random.shuffle(texts_labels)
texts_clean, labels = zip(*texts_labels)

train_texts = texts_clean[:num_train]
train_labels = labels[:num_train]

test_texts = texts_clean[num_train:]
test_labels = labels[num_train:]

In [7]:
train_data = pd.DataFrame({'text': train_texts,
                          'label': train_labels})
test_data = pd.DataFrame({'text': test_texts,
                          'label': test_labels})

In [8]:
train_data.to_csv('./train_data.csv',index=False)
test_data.to_csv('./test_data.csv',index=False)

### 1.2. 데이터 전처리
---

List 형태로 저장되어 있는 데이터와 라벨을 torch 모델에 적용할 수 있도록 전처리합니다. 이때, torchtext 라이브러리를 사용합니다.

In [9]:
tokenizer = Okt()

TEXT = torchtext.legacy.data.Field(tokenize=tokenizer.morphs,
                 include_lengths=True)

LABEL = torchtext.legacy.data.LabelField(dtype=torch.long)

fields = {'text': ('text', TEXT), 'label': ('label', LABEL)}

train, validation, test 데이터를 구분지어 만듭니다.

In [10]:
train_data, test_data = torchtext.legacy.data.TabularDataset.splits(
                            path = './',
                            train = 'train_data.csv',
                            test = 'test_data.csv',
                            format = 'csv',
                            fields = fields,  
)

In [11]:
train_data, valid_data = train_data.split()

자연어 데이터를 컴퓨터로 표현하기 위한 임베딩 벡터를 가져옵니다. 본 프로젝트에서는 한국어 임베딩이 있는 FastText 서브 워드 임베딩을 사용합니다.

In [12]:
TEXT.build_vocab(train_data,
                 max_size = 10000,
                 vectors = 'fasttext.simple.300d',
                 unk_init = torch.Tensor.normal_)

LABEL.build_vocab(train_data)

In [13]:
batch_size = 64

device = torch.device('cpu')

train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size = batch_size,
    sort_key = lambda x: len(x.text),
    sort_within_batch = True,
    device = device)

## 2. Transformer Encoder 모델 구성

---

Transformer Encoder 모델을 torch로 구성합니다. Encoder 모듈은 아래와 같이 구성됩니다.


* Transformer Encoder
    * Positional Encoding
    * Transformer Encoder Layer
        * Multi-head (self) Attention
            * Scaled-Dot Product Attention
        * Feed-Forward


In [14]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

Scaled-Dot Product Attention 모듈을 구성합니다.

In [15]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self, temperature, dropout=0.1):
        super(ScaledDotProductAttention, self).__init__()

        self.temperature = temperature
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, q, k, v, mask=None):
        attn_score = torch.matmul(q, k.transpose(2, 3)) / self.temperature
        if mask is not None:
            attn_score = attn_score.masked_fill(mask=mask, value=float('-inf'))
        attn_dist = torch.softmax(attn_score, dim=-1)
        attn = self.dropout(attn_dist)

        attn_out = torch.matmul(attn, v)

        return attn_out, attn_dist

Multi-head Attention 모듈을 구성합니다.

In [16]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dim, k_channels, v_channels, n_head=8, dropout=0.1):
        super(MultiHeadAttention, self).__init__()

        self.embedding_dim = embedding_dim
        self.k_channels = k_channels
        self.v_channels = v_channels
        self.n_head = n_head

        self.q_linear = nn.Linear(embedding_dim, n_head * k_channels)
        self.k_linear = nn.Linear(embedding_dim, n_head * k_channels)
        self.v_linear = nn.Linear(embedding_dim, n_head * v_channels)
        self.attention = ScaledDotProductAttention(temperature=k_channels ** 0.5, dropout=dropout)
        self.out_linear = nn.Linear(n_head * v_channels, embedding_dim)

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, q, k, v, mask=None):
        b, q_len, k_len, v_len = q.size(0), q.size(1), k.size(1), v.size(1)

        q = self.q_linear(q).view(b, q_len, self.n_head, self.k_channels).transpose(1, 2)
        k = self.k_linear(k).view(b, k_len, self.n_head, self.k_channels).transpose(1, 2)
        v = self.v_linear(v).view(b, v_len, self.n_head, self.v_channels).transpose(1, 2)

        if mask is not None:
            mask = mask.unsqueeze(1)

        out, attn = self.attention(q, k, v, mask=mask)
        out = out.transpose(1, 2).contiguous().view(b, q_len, self.n_head * self.v_channels)
        out = self.out_linear(out)
        out = self.dropout(out)

        return out, attn

Feed-forward 모듈을 구성합니다.

In [17]:
class Feedforward(nn.Module):
    def __init__(self, filter_size, hidden_size, dropout=0.1):
        super(Feedforward, self).__init__()
        self.fc1 = nn.Linear(hidden_size, filter_size, True)
        self.fc2 = nn.Linear(filter_size, hidden_size, True)

    def forward(self, x):
        out = self.fc1(x)
        out = F.relu(out)
        out = self.fc2(out)
        out = F.relu(out)

        return out

Transformer Encoder Layer (block) 모듈을 구성합니다.

In [18]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, embedding_dim, filter_size, num_heads, dropout_rate):
        super(TransformerEncoderLayer, self).__init__()
        
        self.attention = MultiHeadAttention(embedding_dim=embedding_dim, k_channels=embedding_dim//num_heads, v_channels=embedding_dim//num_heads, n_head=num_heads, dropout=dropout_rate)
        self.attention_norm = nn.LayerNorm(normalized_shape=embedding_dim)

        self.feedforward = Feedforward(filter_size=filter_size, hidden_size=embedding_dim)
        self.feedforward_norm = nn.LayerNorm(normalized_shape=embedding_dim)

    def forward(self, src, src_mask=None):
        attn_out, _ = self.attention(src, src, src, src_mask)
        out = src + attn_out
        out = self.attention_norm(out)
        
        ffn_out = self.feedforward(out)
        out = out + ffn_out
        out = self.feedforward_norm(out) 

        return out

Positional Encoder 모듈을 구성합니다.

In [19]:
class PositionalEncoder(nn.Module):
    def __init__(self, embedding_dim, max_len=2000, dropout=0.1):
        super(PositionalEncoder, self).__init__()

        self.position_encoder = self.generate_encoder(embedding_dim, max_len)
        self.position_encoder = self.position_encoder.unsqueeze(0)
        self.dropout = nn.Dropout(p=dropout)

    def generate_encoder(self, embedding_dim, max_len):
        pos = torch.arange(max_len).float().unsqueeze(1)

        i = torch.arange(embedding_dim).float().unsqueeze(0)
        angle_rates = 1 / torch.pow(10000, (2 * (i // 2)) / embedding_dim)

        position_encoder = pos * angle_rates
        position_encoder[:, 0::2] = torch.sin(position_encoder[:, 0::2])
        position_encoder[:, 1::2] = torch.cos(position_encoder[:, 1::2])

        return position_encoder

    def forward(self, x):
        out = x + self.position_encoder[:, :x.size(1), :]
        out = self.dropout(out)

        return out

위의 모듈을 기반으로 Transformer Encoder 모델을 구성합니다.

In [20]:
class TransformerEncoder(nn.Module):
    def __init__(self, embedding_dim, filter_size, num_enc_layers, num_heads, dropout_rate):
        super(TransformerEncoder, self).__init__()
        
        self.pos_encoder = PositionalEncoder(embedding_dim=embedding_dim, dropout=dropout_rate)
        self.layers = nn.ModuleList([TransformerEncoderLayer(embedding_dim, filter_size, num_heads, dropout_rate) for _ in range(num_enc_layers)])

    def forward(self, src, src_mask=None, length=None):
        out = self.pos_encoder(src)
        
        if length is not None:
            src_mask = torch.ones(src.size(0), src.size(1))
            for i in range(src.size(0)):
                src_mask[i][:length[i]] = 0
            src_mask = src_mask.bool()
            src_mask = src_mask.unsqueeze(1)
            
        for layer in self.layers:
            out = layer(out, src_mask)

        return out

## 3. Transformer Decoder 모델 구성

---

Transformer Decoder 모델을 torch로 구성합니다. Decoder 모듈은 아래와 같이 구성됩니다.


* Transformer Decoder
    * Transformer Decoder Layer
        * Multi-head (self) Attention
            * Scaled-Dot Product Attention
        * Multi-head (encoder-decoder) Attebtion
            * Scaled-Dot Product Attention
        * Feed-Forward

위와 같이 기존 구성한 attention, feed-forward 모듈로 구성할 수 있습니다. self_attention과 attention 모델이 forward 함수에서 어떤 입력값을 받는 지 유의깊게 확인해봅시다.

In [21]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, input_size, filter_size, num_heads, dropout_rate):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(in_channels=input_size, k_channels=input_size//num_heads, v_channels=input_size//num_heads, n_head=num_heads, dropout=dropout_rate)
        self.self_attention_norm = nn.LayerNorm(normalized_shape=input_size)

        self.attention = MultiHeadAttention(in_channels=input_size, k_channels=input_size//num_heads, v_channels=input_size//num_heads, n_head=num_heads, dropout=dropout_rate)
        self.attention_norm = nn.LayerNorm(normalized_shape=input_size)

        self.feedforward = Feedforward(filter_size=filter_size, hidden_size=input_size, fc_option=fc_option)
        self.feedforward_norm = nn.LayerNorm(normalized_shape=input_size)

    def forward(self, src, tgt, mask=None):
        self_attn_out, _ = self.self_attention(tgt, tgt, tgt, mask)
        out = tgt + self_attn_out
        out = self.self_attention_norm(out)
        
        attn_out, _ = self.attention(out, src, src)
        out = out + attn_out
        out = self.attention_norm(out)
        
        ffn_out = self.feedforward(out_norm)
        out = out + ffn_out
        out = self.feedforward_norm(out) 

        return out

In [22]:
class TransformerDecoder(nn.Module):
    def __init__(self, input_size, filter_size, num_enc_layers, num_heads, dropout_rate):
        super(TransformerDecoder, self).__init__()
        
        self.layers = nn.ModuleList([TransformerDecoderLayer(input_size, filter_size, num_heads, dropout_rate) for _ in range(num_enc_layers)])

    def forward(self, src, tgt, mask=None):
        out = tgt
        for layer in self.layers:
            out = layer(src, out, mask)

        return out

## 4. Transformer Encoder을 통한 문의 유형 분류 문제

---
Transformer Encoder-Decoder 구조 자체는 입력 시퀀스와 출력 시퀀스가 있을 때 잘 활용될 수 있는 모델입니다. 분류 문제의 경우 입력 시퀀스의 정보로부터 유형을 찾아내면 되기 때문에 Transformer Encoder 구조만을 활용해도 충분합니다.

따라서 이번 단원에서는 Transformer Encoder 구조를 활용하여 문의 유형 분류 모델을 구성하여 학습해보도록 하겠습니다.


### 4.1 Transformer 분류 모델 구성

In [23]:
class TransformerClassifier(nn.Module):

    def __init__(self, embedding_dim, filter_size, vocab_size, target_size, num_enc_layers, num_heads, dropout_rate, pad_idx):
        super(TransformerClassifier, self).__init__()
        # 단어 임베딩
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        # Transformer Encoder
        self.tfm = TransformerEncoder(embedding_dim, filter_size, num_enc_layers, num_heads, dropout_rate)
        # 분류자 (classifier)
        self.fc = nn.Linear(embedding_dim, target_size)
    
    def forward(self, text, text_length):
        embeds = self.word_embeddings(text)
        embeds = embeds.permute(1, 0, 2)
        tfm_out = self.tfm(embeds, length=text_length)
        tfm_out_pool = tfm_out.sum(dim=1) / text_length.unsqueeze(1)
        logits = self.fc(tfm_out_pool)
        scores = F.log_softmax(logits, dim=1)
        
        return scores

### 4.2 하이퍼파라미터 설정

In [24]:
VOCAB_SIZE = len(TEXT.vocab) # 단어 개수
EMBEDDING_DIM = 300 # 임베딩 차원
FILTER_SIZE = 600 # 은닉 상태 차원
TARGET_SIZE = len(LABEL.vocab.stoi) # 라벨 클래스 개수
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token] # 패딩 인덱스
NUM_ENC_LAYERS = 2 # 인코더 레이어 개수
NUM_HEADS = 1 # 어텐션 헤드 개수
DROPOUT_RATE = 0. # 드롭아웃 비율


### 4.3 모델 학습

모델, 손실 함수 (loss function), 옵티마이저 (optimizer) 설정

In [25]:
model = TransformerClassifier(EMBEDDING_DIM, FILTER_SIZE, VOCAB_SIZE, TARGET_SIZE, NUM_ENC_LAYERS, NUM_HEADS, DROPOUT_RATE, PAD_IDX)
loss_function = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

  angle_rates = 1 / torch.pow(10000, (2 * (i // 2)) / embedding_dim)


임베딩을 사전 학습된 FastText 임베딩으로 덮어씌웁니다.

In [26]:
pretrained_embeddings = TEXT.vocab.vectors
model.word_embeddings.weight.data.copy_(pretrained_embeddings) 

tensor([[ 1.3327,  0.1853, -0.6197,  ..., -2.1800, -0.2453,  1.4156],
        [-0.2287, -1.4974, -1.0487,  ..., -1.0487, -0.7778, -2.1256],
        [ 0.9228, -0.0855, -0.3727,  ..., -1.4728, -0.8051,  1.3485],
        ...,
        [-0.6370,  0.8996, -0.2592,  ...,  0.3337, -0.4391,  0.7791],
        [ 0.6624,  0.5354,  1.5111,  ...,  0.0907, -0.3367, -1.2557],
        [ 1.2730,  1.9402, -0.9689,  ..., -1.0843,  0.7427,  0.8532]])

사전에 정의되지 않은 단어에 대한 토큰인 `<UNK>`와 빈 칸을 위한 토큰인 `<PAD>`를 0 벡터로 설정합니다.

In [27]:
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
print(UNK_IDX, PAD_IDX)

model.word_embeddings.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.word_embeddings.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

print(model.word_embeddings.weight.data)

0 1
tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.9228, -0.0855, -0.3727,  ..., -1.4728, -0.8051,  1.3485],
        ...,
        [-0.6370,  0.8996, -0.2592,  ...,  0.3337, -0.4391,  0.7791],
        [ 0.6624,  0.5354,  1.5111,  ...,  0.0907, -0.3367, -1.2557],
        [ 1.2730,  1.9402, -0.9689,  ..., -1.0843,  0.7427,  0.8532]])


정확도를 재는 함수를 정의합니다.

In [28]:
def accuracy(prediction, label):
    prediction_argmax = prediction.max(dim=-1)[1]
    correct = (prediction_argmax == label).float()
    acc = correct.sum() / len(correct)
    return acc

훈련 함수를 정의합니다.

In [29]:
def train(model, iterator, optimizer, loss_function):
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        optimizer.zero_grad()
        text, text_length = batch.text
        if 0. in text_length:
            continue
        predictions = model(text, text_length)
        
        loss = loss_function(predictions, batch.label)
        acc = accuracy(predictions, batch.label)
        
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

평가 함수를 정의합니다.

In [30]:
def evaluate(model, iterator, loss_function):
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
        for batch in iterator:
            text, text_length = batch.text
            if 0. in text_length:
                continue
            predictions = model(text, text_length)
            
            loss = loss_function(predictions, batch.label)
            acc = accuracy(predictions, batch.label)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

학습을 진행합니다.

In [31]:
NUM_EPOCHS = 10
best_valid_loss = float('inf')

In [32]:
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train(model, train_iterator, optimizer, loss_function)
    valid_loss, valid_acc = evaluate(model, valid_iterator, loss_function)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tfm-best.pt')
        
    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epoch: 01
	Train Loss: 1.864 | Train Acc: 39.46%
	 Val. Loss: 1.381 |  Val. Acc: 50.60%
Epoch: 02
	Train Loss: 1.237 | Train Acc: 59.45%
	 Val. Loss: 1.181 |  Val. Acc: 61.27%
Epoch: 03
	Train Loss: 1.053 | Train Acc: 65.57%
	 Val. Loss: 1.183 |  Val. Acc: 56.06%
Epoch: 04
	Train Loss: 0.979 | Train Acc: 67.71%
	 Val. Loss: 1.136 |  Val. Acc: 62.53%
Epoch: 05
	Train Loss: 0.873 | Train Acc: 70.68%
	 Val. Loss: 1.119 |  Val. Acc: 62.77%
Epoch: 06
	Train Loss: 0.800 | Train Acc: 73.21%
	 Val. Loss: 1.086 |  Val. Acc: 62.91%
Epoch: 07
	Train Loss: 0.700 | Train Acc: 76.12%
	 Val. Loss: 1.108 |  Val. Acc: 64.53%
Epoch: 08
	Train Loss: 0.635 | Train Acc: 78.44%
	 Val. Loss: 1.113 |  Val. Acc: 63.73%
Epoch: 09
	Train Loss: 0.555 | Train Acc: 80.30%
	 Val. Loss: 1.171 |  Val. Acc: 63.68%
Epoch: 10
	Train Loss: 0.549 | Train Acc: 80.29%
	 Val. Loss: 1.228 |  Val. Acc: 64.39%


테스트 데이터에 대하여 평가를 진행합니다.

In [34]:
model.load_state_dict(torch.load('tfm-best.pt'))
test_loss, test_acc = evaluate(model, test_iterator, loss_function)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 1.058 | Test Acc: 63.52%
