# Transformer 실습

이번 실습에서는 감정 분석 task에 RNN 대신 Transformer를 구현하여 적용해 볼 것입니다.
Library import나 dataloader 생성은 RNN 실습 때와 똑같기 때문에 설명은 넘어가도록 하겠습니다.

In [1]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.0.1-py3-none-any.whl.metadata (20 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.0.1-py3-none-any.whl (471 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m471.6/471.6 kB[0m [31m20.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2K

In [2]:
!pip install sacremoses

Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl.metadata (8.3 kB)
Downloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/897.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 kB[0m [31m36.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sacremoses
Successfully installed sacremoses-0.1.1


In [3]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

Downloading: "https://github.com/huggingface/pytorch-transformers/zipball/main" to /root/.cache/torch/hub/main.zip


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]



## Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.
여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.
구현은 다음과 같습니다.

In [4]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

## [과제] Multi-head Attention

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, n_heads):
        super().__init__()

        if input_dim % n_heads != 0:
            raise ValueError("input_dim must be divisible by n_heads")

        self.input_dim = input_dim
        self.d_model = d_model
        self.n_heads = n_heads

        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)
        self.dense = nn.Linear(input_dim, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def split_into_heads(self, tensor, n_heads):
        if len(tensor.shape) is not 3:
            raise ValueError(
                "Input tensor must have 3 dimensions (batch_size, sequence_length, hidden_dim)"
            )

        b, s, d = tensor.shape
        d_prime = d // n_heads
        return tensor.reshape(b, s, n_heads, d_prime).transpose(1, 2)

    def forward(self, x, mask):
        q = self.wq(x)
        q = self.split_into_heads(q, self.n_heads)

        k = self.wk(x)
        k = self.split_into_heads(k, self.n_heads)

        v = self.wv(x)
        v = self.split_into_heads(v, self.n_heads)

        score = q @ k.transpose(-1, -2)  # (B, H, S, D) * (B, H, D, S) = (B, H, S, S)
        score = score / sqrt(self.d_model)

        if mask is not None:
            score = score + (mask.unsqueeze(-2) * -1e9)

        score = self.softmax(score)
        result = score @ v

        result = result.transpose(1, 2)
        result = result.reshape(x.shape)
        result = self.dense(result)

        return result

  if len(tensor.shape) is not 3:


대부분은 Transformer 챕터에서 배운 수식들을 그대로 구현한 것에 불과합니다.
차이점은 `mask`의 존재여부입니다.
이전 챕터에서 우리는 가변적인 text data들에 padding token을 붙여 하나의 matrix로 만든 방법을 배웠습니다.
실제 attention 계산에서는 이를 무시해주기 위해 mask를 만들어 제공해주게 됩니다.
여기서 mask의 shape은 (B, S, 1)로, 만약 `mask[i, j] = True`이면 그 변수는 padding token에 해당한다는 뜻입니다.
이러한 값들을 무시해주는 방법은 shape이 (B, S, S)인 `score`가 있을 때(수업에서 배운 $A$와 동일) `score[i, j]`에 아주 작은 값을 더해주면 됩니다. 아주 작은 값은 예를 들어 `-1000..00 = -1e9` 같은 것이 있습니다.
이렇게 작은 값을 더해주고 나면 softmax를 거쳤을 때 0에 가까워지기 때문에 weighted sum 과정에서 padding token에 해당하는 `v` 값들을 무시할 수 있게 됩니다.

다음은 self-attention과 feed-forward layer를 구현한 모습입니다.

In [6]:
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff, n_heads):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff

        self.mha = MultiHeadAttention(input_dim, d_model, n_heads)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff), nn.ReLU(), nn.Linear(dff, d_model)
        )
        self.dropout = nn.Dropout(0.1)
        self.layer_norm = nn.LayerNorm(input_dim)

    """
    Single-head attention
    """
    # def forward(self, x, mask):
    #   x = self.sa(x, mask)
    #   x = self.ffn(x)

    #   return x

    """
    Multi-head attention
    """
    def forward(self, x, mask):
        x1 = self.mha(x, mask)
        x1 = self.dropout(x1)
        x1 = self.layer_norm(x1 + x)

        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.layer_norm(x2 + x1)

        return x2

보시다시피 self-attention의 구현이 어렵지, Transformer layer 하나 구현하는 것은 수업 때 다룬 그림과 크게 구분되지 않는다는 점을 알 수 있습니다.

## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

In [7]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


Positional encoding은 `angle_rads`를 구현하는 과정에서 모두 구현이 되었습니다. 여기서 `angle_rads`의 shape은 (S, D)입니다.
우리는 일반적으로 batch로 주어지는 shape이 (B, S, D)인 tensor를 다루기 때문에 마지막에 None을 활용하여 shape을 (1, S, D)로 바꿔주게됩니다.

위에서 구현한 `TransformerLayer`와 positional encoding을 모두 합친 모습은 다음과 같습니다:

In [8]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)])
    self.classification = nn.Linear(d_model, 1)

  def forward(self, x):
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :]
    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0]
    x = self.classification(x)

    return x


n_layers = 5 # 5-layer
n_heads = 4 # 4-heads
model = TextClassifier(len(tokenizer), 32, n_layers, 32, n_heads)

기존과 다른 점들은 다음과 같습니다:
1. `nn.ModuleList`를 사용하여 여러 layer의 구현을 쉽게 하였습니다.
2. Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 `x[:, 0]`입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

## 학습

학습하는 코드는 기존 실습들과 동일하기 때문에 마지막 결과만 살펴보도록 하겠습니다.

In [9]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [10]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [11]:
n_epochs = 50

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 210.35852748155594
Epoch   1 | Train Loss: 140.14069086313248
Epoch   2 | Train Loss: 109.55957312881947
Epoch   3 | Train Loss: 87.03756070137024
Epoch   4 | Train Loss: 64.11789696104825
Epoch   5 | Train Loss: 48.00787900388241
Epoch   6 | Train Loss: 34.36243764590472
Epoch   7 | Train Loss: 27.994321119971573
Epoch   8 | Train Loss: 23.807034112978727
Epoch   9 | Train Loss: 20.456317599862814
Epoch  10 | Train Loss: 20.12737422576174
Epoch  11 | Train Loss: 16.95861612027511
Epoch  12 | Train Loss: 17.522038038354367
Epoch  13 | Train Loss: 15.930031307274476
Epoch  14 | Train Loss: 15.803759128786623
Epoch  15 | Train Loss: 14.886280714068562
Epoch  16 | Train Loss: 14.338303215568885
Epoch  17 | Train Loss: 14.909148051636294
Epoch  18 | Train Loss: 14.783475493313745
Epoch  19 | Train Loss: 13.964656771626323
Epoch  20 | Train Loss: 13.87191060744226
Epoch  21 | Train Loss: 13.896272649988532
Epoch  22 | Train Loss: 11.904839710332453
Epoch  23 | Train 

참고: 이전 출력 (Single-head attention)
```plaintext
Epoch   0 | Train Loss: 242.4758207499981
=========> Train acc: 0.769 | Test acc: 0.744
Epoch   1 | Train Loss: 181.56073209643364
=========> Train acc: 0.824 | Test acc: 0.775
Epoch   2 | Train Loss: 152.99550041556358
=========> Train acc: 0.864 | Test acc: 0.793
Epoch   3 | Train Loss: 129.0777924656868
=========> Train acc: 0.893 | Test acc: 0.804
Epoch   4 | Train Loss: 110.52370748668909
=========> Train acc: 0.911 | Test acc: 0.805

.
.
.

Epoch  45 | Train Loss: 3.369417593916296
=========> Train acc: 0.998 | Test acc: 0.792
Epoch  46 | Train Loss: 4.6130708061791665
=========> Train acc: 0.999 | Test acc: 0.791
Epoch  47 | Train Loss: 2.8299819592712083
=========> Train acc: 0.998 | Test acc: 0.792
Epoch  48 | Train Loss: 3.979393209456248
=========> Train acc: 0.998 | Test acc: 0.792
Epoch  49 | Train Loss: 3.725896252102757
=========> Train acc: 0.999 | Test acc: 0.792
```

학습이 안정적으로 진행되며 RNN보다 빨리 수렴하는 것을 확인할 수 있습니다.
하지만 test 정확도가 RNN보다 낮은 것을 보았을 때, overfitting에 취약하다는 것을 알 수 있습니다.

- drop-out 적용하여 over-fitting이 완화됨

## [참고] Multi-head Attention 다른 구현체

In [12]:
# 기존 SelfAttention 클래스 활용
class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, n_heads):
        super().__init__()

        if input_dim % n_heads != 0:
            raise ValueError("input_dim must be divisible by n_heads")

        d_prime = d_model // n_heads
        self.attentions = nn.ModuleList(
            [SelfAttention(input_dim, d_prime) for _ in range(n_heads)]
        )
        self.dense = nn.Linear(input_dim, input_dim)

    def forward(self, x, mask):
        head_outputs = [attention(x, mask) for attention in self.attentions]
        concatenated_heads = torch.cat(head_outputs, dim=-1)
        # 이렇게 하면 SelfAttention 모듈에서 dense 레이어가 필요없는 것 아닌지?
        output = self.dense(concatenated_heads)

        return output

1. 이전 `MultiHeadAttention` 구현
  - q, k, v 행렬을 reshape
  - 크기가 `n_heads`인 차원을 추가 (tensor rank: 2 -> 3)
  - head 별로 다른 feature를 학습하는 것은, PyTorch의 텐서연산으로 처리됨
  - 모든 head 연산이 병렬로 처리됨

2. 이번 `MultiHeadAttention` 구현
  - 이미 구현되어 있는 `SelfAttention` 재사용
  - `nn.ModuleList`를 활용해 `SelfAttention`모듈을 multi-head로 확장
  - head를 순회하며 연산함
  - 코드는 깔끔해지지만, 성능 저하 발생함
