<a href="https://colab.research.google.com/github/Jun0S2/AIBootcamp/blob/main/%EC%8B%AC%ED%99%942.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer 실습

이번 실습에서는 감정 분석 task에 RNN 대신 Transformer를 구현하여 적용해 볼 것입니다.
Library import나 dataloader 생성은 RNN 실습 때와 똑같기 때문에 설명은 넘어가도록 하겠습니다.

In [29]:
!pip install datasets sacremoses



In [46]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


In [47]:
class Transformer(nn.Module):
    def __init__(self, input_dim, d_model, dff, n_heads, n_layers, dropout_rate=0.1):
        super().__init__()

        self.layers = nn.ModuleList([
            TransformerLayer(input_dim, d_model, dff, n_heads, dropout_rate)
            for _ in range(n_layers)
        ])

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return x


## Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.
여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.
구현은 다음과 같습니다.

In [48]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

## [Mycode] MHA
* Q,K,V 분리 및 분할
* QK^T/rootD' 계산 후 softmax 로 정규화
* 결과 값 병합하고 다시 integration

In [49]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from math import sqrt

class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, n_heads):
        super().__init__()

        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"

        self.input_dim = input_dim
        self.d_model = d_model
        self.n_heads = n_heads
        self.depth = d_model // n_heads

        # Linear layers for Q, K, V
        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)

        # Output linear layer
        self.dense = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.size()

        # Compute Q, K, V
        Q = self.wq(x)  # (B, S, D)
        K = self.wk(x)  # (B, S, D)
        V = self.wv(x)  # (B, S, D)

        # Reshape to (B, H, S, D')
        Q = Q.view(batch_size, seq_len, self.n_heads, self.depth).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.n_heads, self.depth).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.n_heads, self.depth).transpose(1, 2)

        # Scaled dot-product attention
        scores = torch.matmul(Q, K.transpose(-1, -2)) / sqrt(self.depth)  # (B, H, S, S)

        if mask is not None:
            mask = mask.unsqueeze(1)  # Adjust mask shape to (B, 1, 1, S)
            scores += mask * -1e9

        attention_weights = F.softmax(scores, dim=-1)  # (B, H, S, S)
        attention_output = torch.matmul(attention_weights, V)  # (B, H, S, D')

        # Concatenate heads and reshape to (B, S, D)
        attention_output = attention_output.transpose(1, 2).contiguous()
        attention_output = attention_output.view(batch_size, seq_len, self.d_model)

        # Apply final linear layer
        output = self.dense(attention_output)  # (B, S, D)

        return output

## [MyCode] TransformerLayer
* MultiHeadAttention 적용
  * 기존 SelfAttention 모듈을 MultiHeadAttention으로 확장
=>  입력 특징을 여러 하위 공간으로 분할하여 더 복잡한 관계를 학습
* Feed-Forward Network (FFN)
  * d_model → dff → d_model 변환을 통해 내부 표현 공간을 확장
* Layer Normalization과 Dropout을 적용

In [50]:
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff, n_heads, dropout_rate=0.1):
    super().__init__()

    self.mha = MultiHeadAttention(input_dim, d_model, n_heads)

    # self.input_dim = input_dim
    # self.d_model = d_model
    # self.dff = dff
    # self.sa = SelfAttention(input_dim, d_model)

    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )
 #  Layer Normalization, Dropout, Residual Connection
    self.layernorm1 = nn.LayerNorm(d_model)
    self.layernorm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout_rate)

  def forward(self, x, mask):
        # Multi-Head Attention
        x1 = self.mha(x, mask)
        x1 = self.dropout(x1)
        x1 = self.layernorm1(x1 + x)  # Residual connection

        # Feed-Forward Network#
        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.layernorm2(x2 + x1)  # Residual connection

        return x2

대부분은 Transformer 챕터에서 배운 수식들을 그대로 구현한 것에 불과합니다.
차이점은 `mask`의 존재여부입니다.
이전 챕터에서 우리는 가변적인 text data들에 padding token을 붙여 하나의 matrix로 만든 방법을 배웠습니다.
실제 attention 계산에서는 이를 무시해주기 위해 mask를 만들어 제공해주게 됩니다.
여기서 mask의 shape은 (B, S, 1)로, 만약 `mask[i, j] = True`이면 그 변수는 padding token에 해당한다는 뜻입니다.
이러한 값들을 무시해주는 방법은 shape이 (B, S, S)인 `score`가 있을 때(수업에서 배운 $A$와 동일) `score[i, j]`에 아주 작은 값을 더해주면 됩니다. 아주 작은 값은 예를 들어 `-1000..00 = -1e9` 같은 것이 있습니다.
이렇게 작은 값을 더해주고 나면 softmax를 거쳤을 때 0에 가까워지기 때문에 weighted sum 과정에서 padding token에 해당하는 `v` 값들을 무시할 수 있게 됩니다.

다음은 self-attention과 feed-forward layer를 구현한 모습입니다.

보시다시피 self-attention의 구현이 어렵지, Transformer layer 하나 구현하는 것은 수업 때 다룬 그림과 크게 구분되지 않는다는 점을 알 수 있습니다.

**굵은 텍스트**## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

In [51]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


## [MyCode]

* Mask 생성 시 to(x.device)를 추가해 입력 데이터와 동일한 장치(CPU/GPU)로 이동하도록 수정->> 아래 학습 오류 부분 수정을 위해 (CPU 관련 에러로 추가)

*  classification에서 CLS 토큰(x[:, 0])을 사용해 최종 분류 결과 계산.

In [56]:
class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff
        self.n_heads = n_heads

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
        self.layers = nn.ModuleList([
            TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)
        ])
        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        # Mask 생성
        mask = (x == tokenizer.pad_token_id).unsqueeze(1).to(x.device)  # mask 생성 및 이동
        seq_len = x.shape[1]

        # Embedding 및 Positional Encoding 추가
        x = self.embedding(x)  # (B, S, D)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]

        # Transformer 레이어 통과
        for layer in self.layers:
            x = layer(x, mask)

        # 첫 번째 토큰의 벡터 사용 (CLS 토큰)
        x = x[:, 0]  # (B, D)
        x = self.classification(x)  # (B, 1)

        return x


#model = TextClassifier(len(tokenizer), 32, 2, 32)
model = TextClassifier(len(tokenizer), d_model=32, n_layers=2, dff=32, n_heads=4)

Positional encoding은 `angle_rads`를 구현하는 과정에서 모두 구현이 되었습니다. 여기서 `angle_rads`의 shape은 (S, D)입니다.
우리는 일반적으로 batch로 주어지는 shape이 (B, S, D)인 tensor를 다루기 때문에 마지막에 None을 활용하여 shape을 (1, S, D)로 바꿔주게됩니다.

위에서 구현한 `TransformerLayer`와 positional encoding을 모두 합친 모습은 다음과 같습니다:

기존과 다른 점들은 다음과 같습니다:
1. `nn.ModuleList`를 사용하여 여러 layer의 구현을 쉽게 하였습니다.
2. Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 `x[:, 0]`입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

## 학습

학습하는 코드는 기존 실습들과 동일하기 때문에 마지막 결과만 살펴보도록 하겠습니다.

In [53]:
# 하이퍼파라미터 설정 및 모델 생성
input_dim = 32   # 입력 임베딩 차원
d_model = 128    # 모델의 차원
dff = 256        # FFN 내부 차원
n_heads = 4      # Multi-Head Attention 헤드 수
n_layers = 5     # Transformer 레이어 수
dropout_rate = 0.1

# 5-layer 4-head Transformer 생성
model = Transformer(input_dim, d_model, dff, n_heads, n_layers, dropout_rate).to('cuda')

In [54]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [55]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [58]:
n_epochs = 50
model = TextClassifier(len(tokenizer), d_model=32, n_layers=2, dff=32, n_heads=4).to('cuda')

for epoch in range(n_epochs):
    total_loss = 0.0
    model.train()
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

        optimizer.zero_grad()

        preds = model(inputs)  # mask는 내부에서 생성
        loss = loss_fn(preds[..., 0], labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{n_epochs} | Loss: {total_loss:.4f}")

    with torch.no_grad():
        model.eval()
        train_acc = accuracy(model, train_loader)
        test_acc = accuracy(model, test_loader)
        print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch 1/50 | Loss: 288.8340
Epoch 2/50 | Loss: 288.8491
Epoch 3/50 | Loss: 288.9631
Epoch 4/50 | Loss: 289.0723
Epoch 5/50 | Loss: 288.8574
Epoch 6/50 | Loss: 289.1306
Epoch 7/50 | Loss: 288.7545
Epoch 8/50 | Loss: 288.8923
Epoch 9/50 | Loss: 288.8682
Epoch 10/50 | Loss: 288.8995
Epoch 11/50 | Loss: 289.0212
Epoch 12/50 | Loss: 288.8761
Epoch 13/50 | Loss: 288.9787
Epoch 14/50 | Loss: 288.9572
Epoch 15/50 | Loss: 288.8725
Epoch 16/50 | Loss: 288.7774
Epoch 17/50 | Loss: 288.9102
Epoch 18/50 | Loss: 288.8093
Epoch 19/50 | Loss: 289.0515
Epoch 20/50 | Loss: 289.0614
Epoch 21/50 | Loss: 288.7908
Epoch 22/50 | Loss: 288.8718
Epoch 23/50 | Loss: 288.6842
Epoch 24/50 | Loss: 288.7606
Epoch 25/50 | Loss: 288.8488
Epoch 26/50 | Loss: 288.8099
Epoch 27/50 | Loss: 289.0539
Epoch 28/50 | Loss: 288.9657
Epoch 29/50 | Loss: 288.7295
Epoch 30/50 | Loss: 288.9545
Epoch 31/50 | Loss: 288.9628
Epoch 32/50 | Loss: 289.0022
Epoch 33/50 | Loss: 288.7498
Epoch 34/50 | Loss: 288.7739
Epoch 35/50 | Loss: 288

##[Feedback 요청] 손실 값이 변화하지 않는 원인과 정확도가 개선되지 않는 이유를 알고 싶습니다

모델 학습 중에 문제가 생겨 도움을 요청드리고자 합니다. 현재 Transformer 기반 감정 분석 모델을 학습하고 있는데, Train/Test accuracy가 계속 0.500으로 고정되어 있으며, Loss 값 또한 거의 변하지 않는 상태입니다

학습이 안정적으로 진행되며 RNN보다 빨리 수렴하는 것을 확인할 수 있습니다.
하지만 test 정확도가 RNN보다 낮은 것을 보았을 때, overfitting에 취약하다는 것을 알 수 있습니다.