# 📌[2주차/기본] 주어진 문장에서 나올 다음 단어를 예측하는 모델 구현

이번 과제에서는 Transformer를 last word prediction이라는 task에 적용합니다.
Last word prediction은 Token list가 주어졌을 때, 다음으로 오는 token을 예측하는 task로, 추후 등장할 LLM의 핵심입니다.

In [18]:
!pip install datasets sacremoses



In [19]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

### 📄 데이터 로딩

IMDb (Internet Movie database)

train 25000개 와 test 25000개로 이루어져 있지만 이 중 5%만 사용한다.

#### 🔍 라벨
0 : 부정적인 리뷰
1 : 긍정적인 리뷰

#### 📦 샘플
```python
{
  'text': "I watched this movie with my family and we all hated it. The story was weak...",
  'label': 0
}
```

In [20]:
train_ds = load_dataset("stanfordnlp/imdb", split="train[:5%]")
test_ds = load_dataset("stanfordnlp/imdb", split="test[:5%]")

🔤 tokenizer 설정

huggingface에서 bert-base-uncased 모델을 불러와서 설정을 한다.

* 토큰화 방식
 * BERT : 단어를 서브워드 단위로 분해
 * BPE : 자주 등장하는 문자쌍을 병합
 * SentencePiece : 공백까지 포함해서 분해

* 대소문자 처리
 * uncased : 모두 소문자로 치환
 * cased : 대소문자 유지



In [21]:
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


📦 collate_fn

text 데이터는 샘플마다 문장 길이가 다르기 때문에 padding이 필요하다
* tokenizer
 * padding=True : 짧은 문장도 padding을 사용해서 일정한 길이로 맞춰준다..
 * truncation : 너무 긴 문장은 max_len 기준으로 자름

🌟 기존 감정 분류와 달라진 점

기본 과제에선 label에 input_ids[-2] 를 넣어주고 있는데, 이는 마지막 단어 예측이 우리 모델의 목표이기 때문이다.


In [22]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-2])
    texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-2]))

  texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)

## ✏️ Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.

여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.

* forward에서 필요한 입력
 * x : 입력 시퀀스
 * mask : 패딩 토큰 무시하거나 casual attention 하는 경우 필요


In [23]:
from torch import nn
from math import sqrt

In [24]:
class SelfAttention(nn.Module):
    def __init__(self, input_dim, d_model):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model

        self.wq = nn.Linear(input_dim, d_model) #in : (B,S,D) output: (B,S,d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)

        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        q = self.wq(x) #(B,S,d_model)
        k = self.wk(x)
        v = self.wv(x)

        score= torch.matmul(q, k.transpose(-1,-2)) # 뒤에서 첫번째 두번째를 바꿔준다 k.transpose = (B,d_model,S) score = (B,S,S)
        score = score / sqrt(self.d_model)

        if mask is not None:
            score = score + (mask * -1e9)

        score = self.softmax(score)
        result = torch.matmul(score, v)
        result = self.dense(result)

        return result

대부분 transformer 구조에 대해 그냥 구현한 것에 불과하지만 mask는 새로운 개념

단어와 단어 사이의 관계도를 보는 Attention score에서 실제 단어와 패딩 단어 사이의 관계도를 고려할 필요가 전혀 없기 때문에 -1e-9를 더하여, 소프트맥스에서 확률 0이 출력되도록 만든 것입니다

---
### 🧱 TransformerLayer

* Self-Attention : 이미 구현해둔 class 사용
* Feed-Forward Network (FFN) : MLP 구조 (Linear > ReLU > Linear)
  * 각 토큰 하나하나에 대해 독립적으로 처리하는 fully connected network


🧠 요약

Attention : token간 관계 표현 개선

FFN : 각 토큰 자체의 표현 개선


In [25]:
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.dff = dff

    self.sa = SelfAttention(input_dim, d_model)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )

  def forward(self, x, mask):
    x = self.sa(x, mask)
    x = self.ffn(x)

    return x

## 🧮 Positional encoding

* 순서 정보를 인코딩해서 모델에 넣어주는 기술
* 각 위치마다 고유한 벡터를 만들어서 입력 임베딩에 더해주는 방식
* Transformer는 Attention 구조라서 문장을 볼때 병렬로 처리하기 때문에 RNN 처럼 순차적 처리가 되지 않음. 따라서 명시적으로 알려줘야 한다.

```python
x = token_embedding + positional_encoding
```

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

In [26]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


---
## 🔧 LastWordPredictor (basic 과제)

####⁉️ 변경 사항
1. 단어 예측 task인 만큼 tokenizer의 vocab_size로 최종 출력 차원을 조정해준다.
2. 문장의 첫번째 토큰 벡터를 뽑는 코드를 삭제하고 마지막 토큰을 뽑도록 한다

####⁉️ sqrt(d_model) 하는 이유
embedding된 벡터는 작은 값인데, positional encoding은 값이 커서 positional encoding의 영향이 너무 큼 > scale up

---

### 🔧 TextClassifier (실습 내용)

1. 입력 토큰을 임베딩 + 포지셔널 인코딩
2. 여러개의 TransformerLayer 통과
3. 문장의 첫번째 토큰의 벡터를 뽑아서
4. 최종 classification

####⁉️ 첫번째 토큰 벡터 뽑는 이유
Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 x[:, 0]입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

In [27]:
class LastWordPredictor(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model) # d_model의 차원의 벡터로 embedding
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)]) # n_layer만큼 transformerLayer를 쌓음
    self.classification = nn.Linear(d_model, vocab_size)

  def forward(self, x):
    #패딩 토큰을 찾아서 mask 생성
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :] # (B,1,S)

    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, -1]
    x = self.classification(x)

    return x


model = LastWordPredictor(len(tokenizer), 32, 2, 32)

## 학습

#### 🔧 변경사항
* loss_fn 이 BCEWithLigitsLoss 는 이진 분류를 위한 것 이므로 변경
* preds 구하는 법과 label을 정수형으로 변경

In [28]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    preds = torch.argmax(preds, dim=-1)

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [29]:
def model_train(model, n_epochs):
    train_acc_list = []
    test_acc_list = []
    train_losses = []

    for epoch in range(n_epochs):
        total_loss = 0.
        model.train()
        for data in train_loader:
            model.zero_grad()
            inputs, labels = data
            inputs, labels = inputs.to('cuda'), labels.to('cuda')

            preds = model(inputs)
            loss = loss_fn(preds, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        train_losses.append(total_loss)
        print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

        with torch.no_grad():
            model.eval()
            train_acc = accuracy(model, train_loader)
            test_acc = accuracy(model, test_loader)
            train_acc_list.append(train_acc)
            test_acc_list.append(test_acc)
            print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")
    return train_losses, train_acc_list, test_acc_list

In [None]:
from torch.optim import Adam
##hyperparam
n_epochs = 50
lr = 0.001

model = LastWordPredictor(len(tokenizer), 32, 2, 32)
model = model.to('cuda')

loss_fn = nn.CrossEntropyLoss()

optimizer = Adam(model.parameters(), lr=lr)

train_losses, train_acc_list, test_acc_list = model_train(model, n_epochs)

Epoch   0 | Train Loss: 192.90383625030518
Epoch   1 | Train Loss: 103.57606196403503
Epoch   2 | Train Loss: 69.05914735794067
Epoch   3 | Train Loss: 55.8815176486969
Epoch   4 | Train Loss: 51.567612051963806
Epoch   5 | Train Loss: 49.205313205718994
Epoch   6 | Train Loss: 47.639344334602356
Epoch   7 | Train Loss: 46.623597383499146
Epoch   8 | Train Loss: 45.67187321186066
Epoch   9 | Train Loss: 45.51940643787384
Epoch  10 | Train Loss: 45.07234513759613
Epoch  11 | Train Loss: 44.79322838783264
Epoch  12 | Train Loss: 43.99995970726013
Epoch  13 | Train Loss: 43.83711361885071
Epoch  14 | Train Loss: 43.59956157207489
Epoch  15 | Train Loss: 44.11829054355621
Epoch  16 | Train Loss: 43.049736976623535


In [None]:
import matplotlib.pyplot as plt

def plot_metrics(train_losses, train_accs, test_accs):
    epochs = range(1, len(train_losses) + 1)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    # Loss plot
    ax1.plot(epochs, train_losses, marker='o', linestyle='-', color='blue')
    ax1.set_title('Training Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.grid(True)

    # Accuracy plot
    ax2.plot(epochs, train_accs, marker='o', linestyle='-', label='Train Accuracy', color='green')
    ax2.plot(epochs, test_accs, marker='x', linestyle='--', label='Test Accuracy', color='red')
    ax2.set_title('Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.show()


In [None]:
plot_metrics(train_losses, train_acc_list, test_acc_list)

### 💭 결과
* loss가 거의 일정한 수준 유지
* train test accuracy도 전혀 개선되지 않음
* 학습이 제대로 되지 않는 것 같아서 개선할 점 찾아야함

### 🚩 의심가는 점
* 마지막 토큰 뽑는 x[:-1] 에서 padding 고려 안함
* embedding 차원 증가시켜서 성능 개선할 수 있을지
* 아니면 fail나는거 한번 까보기

---


#### 🧪 실험1
패딩 고려한 진짜 마지막 단어 찾기

```
[
  [단어1, 단어2, 단어3, PAD, PAD],     # 문장 길이 3
  [단어4, 단어5, 단어6, 단어7, PAD],   # 문장 길이 4
  [단어8, 단어9, PAD, PAD, PAD]        # 문장 길이 2
]
```
x[:-1] 에서는 PAD 쪽 토큰이 추출된다.

In [None]:
class LastWordPredictor_edit_padding(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model) # d_model의 차원의 벡터로 embedding
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)]) # n_layer만큼 transformerLayer를 쌓음
    self.classification = nn.Linear(d_model, vocab_size)

  def forward(self, x):
    #패딩 토큰을 찾아서 mask 생성
    padd_mask = (x == tokenizer.pad_token_id)
    mask = padd_mask[:, None, :] # (B,1,S)

    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    lengths = (~padd_mask).sum(dim=1) #각 문장의 실제 길이 리스트

    batch_size = x.size(0)
    last_token_output = x[torch.arange(batch_size), lengths -1, :]
    x = self.classification(last_token_output)

    return x

In [None]:
model_with_padding = LastWordPredictor_edit_padding(len(tokenizer), 32, 2, 32)
model_with_padding = model_with_padding.to('cuda')

loss_fn = nn.CrossEntropyLoss()

optimizer = Adam(model_with_padding.parameters(), lr=lr)

train_losses2, train_acc_list2, test_acc_list2 = model_train(model_with_padding, n_epochs)

### 🔍 데이터 확인

In [None]:
from collections import Counter

label_counter = Counter()

for texts, labels in train_loader:
    tokens = tokenizer.convert_ids_to_tokens(labels.numpy())
    label_counter.update(tokens)

print("가장 많이 등장한 레이블 상위 10개:")
for token, freq in label_counter.most_common(10):
    print(f"{token:<10}: {freq}회")
