# 주어진 문장에서 나올 다음 단어를 예측하는 모델 구현

데이터 셋 설치

In [None]:
!pip install datasets
# !pip install sacremoses 추가 설치
!pip install sacremoses

Collecting datasets
  Downloading datasets-3.0.0-py3-none-any.whl.metadata (19 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.0.0-py3-none-any.whl (474 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m474.3/474.3 kB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2K 

In [None]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
#tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')
# 에러로 인한 아래 구문으로 대체
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')



def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

## Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.
여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.
구현은 다음과 같습니다.

In [None]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

대부분은 Transformer 챕터에서 배운 수식들을 그대로 구현한 것에 불과합니다.
차이점은 `mask`의 존재여부입니다.
이전 챕터에서 우리는 가변적인 text data들에 padding token을 붙여 하나의 matrix로 만든 방법을 배웠습니다.
실제 attention 계산에서는 이를 무시해주기 위해 mask를 만들어 제공해주게 됩니다.
여기서 mask의 shape은 (B, S, 1)로, 만약 `mask[i, j] = True`이면 그 변수는 padding token에 해당한다는 뜻입니다.
이러한 값들을 무시해주는 방법은 shape이 (B, S, S)인 `score`가 있을 때(수업에서 배운 $A$와 동일) `score[i, j]`에 아주 작은 값을 더해주면 됩니다. 아주 작은 값은 예를 들어 `-1000..00 = -1e9` 같은 것이 있습니다.
이렇게 작은 값을 더해주고 나면 softmax를 거쳤을 때 0에 가까워지기 때문에 weighted sum 과정에서 padding token에 해당하는 `v` 값들을 무시할 수 있게 됩니다.

다음은 self-attention과 feed-forward layer를 구현한 모습입니다.

In [None]:
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.dff = dff

    self.sa = SelfAttention(input_dim, d_model)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )

  def forward(self, x, mask):
    x = self.sa(x, mask)
    x = self.ffn(x)

    return x

보시다시피 self-attention의 구현이 어렵지, Transformer layer 하나 구현하는 것은 수업 때 다룬 그림과 크게 구분되지 않는다는 점을 알 수 있습니다.

## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

In [None]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


Positional encoding은 `angle_rads`를 구현하는 과정에서 모두 구현이 되었습니다. 여기서 `angle_rads`의 shape은 (S, D)입니다.
우리는 일반적으로 batch로 주어지는 shape이 (B, S, D)인 tensor를 다루기 때문에 마지막에 None을 활용하여 shape을 (1, S, D)로 바꿔주게됩니다.

위에서 구현한 `TransformerLayer`와 positional encoding을 모두 합친 모습은 다음과 같습니다:

In [None]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)])
    self.classification = nn.Linear(d_model, 1)

  def forward(self, x):
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :]
    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0]
    x = self.classification(x)

    return x


model = TextClassifier(len(tokenizer), 32, 2, 32)

기존과 다른 점들은 다음과 같습니다:
1. `nn.ModuleList`를 사용하여 여러 layer의 구현을 쉽게 하였습니다.
2. Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 `x[:, 0]`입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

## 학습

학습하는 코드는 기존 실습들과 동일하기 때문에 마지막 결과만 살펴보도록 하겠습니다.

In [None]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [None]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [None]:
# n_epochs = 50 50번 확인 했음, 시간 상 축소후 실행
n_epochs = 3

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 227.82510432600975
Epoch   1 | Train Loss: 177.217821508646
Epoch   2 | Train Loss: 152.84329444169998


학습이 안정적으로 진행되며 RNN보다 빨리 수렴하는 것을 확인할 수 있습니다.
하지만 test 정확도가 RNN보다 낮은 것을 보았을 때, overfitting에 취약하다는 것을 알 수 있습니다.

#준비
- [ ]  Multi-head attention(MHA) 구현
    - Self-attention module을 MHA로 확장해주시면 됩니다. 여기서 MHA는 다음과 같이 구현합니다.

        2. $A = QK^T/\sqrt{D'} \in \mathbb{R}^{H \times S \times S}$를 기존의 self-attention과 똑같이 계산합니다. 이 부분은 코드 수정이 필요 없습니다.
        3. Mask를 더합니다. 기존과 $A$의 shape이 달라졌기 때문에 dimension을 어떻게 맞춰줘야할지 생각해줘야 합니다.
        4. $\hat{x} = \textrm{Softmax}(A)V \in \mathbb{R}^{H \times S \times D'}$를 계산해주고 transpose와 reshape을 통해 $\hat{x} \in \mathbb{R}^{S \times D}$의 shape으로 다시 만들어줍니다.
        5. 기존과 똑같이 $\hat{x} = \hat{x} W_o$를 곱해줘서 마무리 해줍니다. 이 또한 코드 수정이 필요 없습니다.

**SelfAttention module**

In [None]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

**Multi-head attention(MHA) 구현**

       1. 기존의 $W_q, W_k, W_v$를 사용하여 $Q, K, V$를 생성합니다. 이 부분은 코드 수정이 필요 없습니다.

            -  $Q, K, V \in \mathbb{R}^{S \times D}$가 있을 때, 이를 $Q, K, V \in \mathbb{R}^{S \times H \times D’}$으로 reshape 해줍니다.
            여기서 $H$는 `n_heads`라는 인자로 받아야 하고, $D$가 $H$로 나눠 떨어지는 값이여야 하는 제약 조건이 필요합니다.
            $D = H \times D’$입니다.

            -  $Q, K, V$를 $Q, K, V \in \mathbb{R}^{H \times S \times D’}$의 shape으로 transpose해줍니다.

In [None]:
import torch
import torch.nn as nn

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, n_heads):
        super(MultiHeadAttention, self).__init__()


        self.embed_dim = embed_dim
        self.n_heads = n_heads
        self.head_dim = embed_dim // n_heads

        self.q = nn.Linear(embed_dim, embed_dim)
        self.k = nn.Linear(embed_dim, embed_dim)
        self.v = nn.Linear(embed_dim, embed_dim)
        self.out = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        #batch_size, seq_length, embed_dim = x.size()
        batch_size, seq_length = x.size()

        # 1. $Q, K, V$를 reshape 먼저, $Q$, $K$, $V$는 이미 주어진 $W_q$, $W_k$, $W_v$에 의해 생성됩니다.
        # 이를 MHA로 확장하기 위해, Q, K, V를 n_heads(헤드 수)에 맞춰 reshaping과 transpose 작업을 수행해야 합니다.
        Q = self.q(x)
        K = self.k(x)
        V = self.v(x)

        Q = Q.view(batch_size, seq_length, self.n_heads, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, seq_length, self.n_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, seq_length, self.n_heads, self.head_dim).transpose(1, 2)

        # 2. $A = QK^T/\sqrt{D'} \in \mathbb{R}^{H \times S \times S}$를 기존의 self-attention과 똑같이 계산합니다.
        attn_weights = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)

        # 3, 4 $\hat{x} = \textrm{Softmax}(A)V \in \mathbb{R}^{H \times S \times D'}$를 계산해주고
        # transpose와 reshape을 통해 $\hat{x} \in \mathbb{R}^{S \times D}$의 shape으로 다시 만들어줍니다.
        # 기존과 똑같이 $\hat{x} = \hat{x} W_o$를 곱해줘서 마무리 해줍니다. 이 또한 코드 수정이 필요 없습니다.
        attn_probs = torch.softmax(attn_weights, dim=-1)
        attn_output = torch.matmul(attn_probs, V)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_length, embed_dim)

        output = self.out(attn_output)

        return output


# 목표
Layer normalization, dropout, residual connection 구현

- 다시 `TransformerLayer` class로 돌아와서 과제를 진행하시면 됩니다.
- Attention module을 $MHA$, feed-forward layer를 $FFN$이라고 하겠습니다.
- 기존의 구현은 다음과 같습니다:
    
    ```python
    # x, mask is given
    
    x1 = MHA(x, mask)
    x2 = FFN(x1)
    
    return x2
    ```
    
- 다음과 같이 수정해주시면 됩니다.
    
    ```python
    # x, mask is given
    
    x1 = MHA(x, mask)
    x1 = Dropout(x1)
    x1 = LayerNormalization(x1 + x)
    
    x2 = FFN(x1)
    x2 = Dropout(x2)
    x2 = LayerNormalization(x2 + x1)
    
    return x2
    ```
    
- 여기서 `x1 + x`와 `x2 + x1`에 해당하는 부분들은 residual connection이라고 부릅니다.

In [None]:

class TransformerLayer(nn.Module):
    def __init__(self, embed_dim, n_heads, ff_dim, dropout_rate=0.1):
        super(TransformerLayer, self).__init__()

        # Multi-head attention 모듈 인스턴스 선언
        self.mha = MultiHeadAttention(embed_dim, n_heads)

        # Feed-Forward Network (FFN)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim)
        )

        # Layer 정규화
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, mask=None):
        # MHA 모듈 연동, dropout, Layer 정규화 처리
        x1 = self.mha(x)
        x1 = self.dropout(x1)
        x1 = self.norm1(x1 + x)  # Residual connection (x1 + x) 처리

        # FFN, dropout, Layer 정규화 처리
        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.norm2(x2 + x1)  # Residual connection (x2 + x1) 처리

        return x2



- [ ]  5-layer 4-head Transformer
- 기존 실습에서 사용한 hyper-parameter들과 위에서 구현한 Transformer를 가지고 5-layer 4-head Transformer의 성능 결과를 report해주시면 됩니다.

In [None]:
# hyper-parameter 정리
embed_dim = 256  # 임베딩 차원
ff_dim = 512     # FFN 차원
n_heads = 4    # Attention 헤드 수
dropout_rate = 0.1  # 드롭아웃 비율
n_layers = 5   # Transformer 레이어 수


In [None]:

class Transformer(nn.Module):
    def __init__(self, embed_dim, ff_dim, n_heads, n_layers, dropout_rate=0.1):
        super(Transformer, self).__init__()

        # Stack multiple Transformer layers
        self.layers = nn.ModuleList([
            TransformerLayer(embed_dim, n_heads, ff_dim, dropout_rate) for _ in range(n_layers)
        ])

        # Output linear layer (classification layer)
        self.classifier = nn.Linear(embed_dim, 2)  # 예를 들어 2개의 클래스로 분류하는 경우

    def forward(self, x, mask=None):
        # Pass through all Transformer layers
        for layer in self.layers:
            x = layer(x, mask)  # 각 레이어를 차례로 적용

        # Classifier (for classification tasks)
        logits = self.classifier(x[:, 0, :])  # [CLS] 토큰에 해당하는 첫 번째 위치의 벡터 사용

        return logits


**학습관련 dataset 준비 및 dataloader 생성**

**학습 및 평가 관련 처리**

In [None]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [None]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [None]:
n_epochs = 50
# n_epochs = 3

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 84.5472870618105
Epoch   1 | Train Loss: 66.91610380634665
Epoch   2 | Train Loss: 55.285802790895104
Epoch   3 | Train Loss: 44.56006589625031
Epoch   4 | Train Loss: 36.29273143503815
Epoch   5 | Train Loss: 30.68852668395266
Epoch   6 | Train Loss: 25.627538420492783
Epoch   7 | Train Loss: 20.254934411495924
Epoch   8 | Train Loss: 17.598864362109452
Epoch   9 | Train Loss: 14.972672131494619
Epoch  10 | Train Loss: 14.811792544322088
Epoch  11 | Train Loss: 13.78137490991503
Epoch  12 | Train Loss: 12.33034927980043
Epoch  13 | Train Loss: 14.661628314759582
Epoch  14 | Train Loss: 9.244419031077996
Epoch  15 | Train Loss: 10.575257675664034
Epoch  16 | Train Loss: 6.608695031362004
Epoch  17 | Train Loss: 9.6583340173529
Epoch  18 | Train Loss: 6.843037497863406
Epoch  19 | Train Loss: 6.69497034244705
Epoch  20 | Train Loss: 9.392803528811783
Epoch  21 | Train Loss: 6.821096960949944
Epoch  22 | Train Loss: 6.995084764941566
Epoch  23 | Train Loss: 7.3466

##과제 B. 주어진 문장에서 나올 다음 단어를 예측하는 모델 구현

- [ ]  Last word prediction dataset 준비
    - 기존의 IMDB dataset을 그대로 활용합니다.
    - `collate_fn` 함수에 다음 수정사항들을 반영하면 됩니다.
        - Label은 text를 token으로 변환했을 때 마지막 token의 id로 설정합니다.
        - 입력 data는 마지막 token을 제외한 나머지 token들의 list로 설정합니다.
    - `from torch.nn.utils.rnn import pad_sequence`를 import해서 사용하셔도 좋습니다.
    - Truncation은 기존과 똑같이 진행하시면 됩니다.

In [None]:
# 데이터 로드 및 전처리 재실행
import torch
import torch.nn as nn
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from torch.nn.utils.rnn import pad_sequence
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
#tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')
# 에러로 인한 아래 구문으로 대체
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')



def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

In [None]:
# 텍스트 토크나이즈 및 전처리
def tokenize_function(examples):
    return tokenizer(examples['text'], padding="max_length", truncation=True, max_length=128)

tokenized_datasets = ds.map(tokenize_function, batched=True)

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [None]:
# collate_fn 수정
def collate_fn(batch):
    input_ids_list = []
    labels_list = []

    for item in batch:
        input_ids = torch.tensor(item['input_ids'])

        # 마지막 토큰을 레이블로 설정하고, 나머지 토큰은 입력 데이터로 설정
        label = input_ids[-1]  # 마지막 토큰
        input_ids = input_ids[:-1]  # 마지막 토큰을 제외한 나머지

        input_ids_list.append(input_ids)
        labels_list.append(label)

    # 입력 데이터를 패딩하여 동일한 길이로 맞추기
    input_ids_padded = pad_sequence(input_ids_list, batch_first=True, padding_value=tokenizer.pad_token_id)
    labels = torch.tensor(labels_list)

    return input_ids_padded, labels


In [None]:
# 데이터 로더 생성
# IMDb 데이터셋을 위한 DataLoader를 생성합니다. 여기서 collate_fn에 위에서 정의한 함수를 전달합니다.

from torch.utils.data import DataLoader

# IMDb 데이터셋 PyTorch 형식으로 변환
train_dataset = tokenized_datasets['train'].with_format("torch")
test_dataset = tokenized_datasets['test'].with_format("torch")

# DataLoader 생성 (collate_fn 적용)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)



- [ ]  Loss function 및 classifier output 변경
    - 마지막 token id를 예측하는 것이기 때문에 binary classification이 아닌 일반적인 classification 문제로 바뀝니다. MNIST 과제에서 했던 것 처럼 `nn.CrossEntropy` loss와 `TextClassifier`의 출력 차원을 잘 조정하여 task를 풀 수 있도록 수정하시면 됩니다.

In [None]:

class TransformerLayer(nn.Module):
    def __init__(self, embed_dim, n_heads, ff_dim, dropout_rate=0.1):
        super(TransformerLayer, self).__init__()

        # Multi-head attention 모듈 인스턴스 선언
        self.mha = MultiHeadAttention(embed_dim, n_heads)

        # Feed-Forward Network (FFN)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim)
        )

        # Layer 정규화
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, mask=None):
        # MHA 모듈 연동, dropout, Layer 정규화 처리
        x1 = self.mha(x)
        x1 = self.dropout(x1)
        x1 = self.norm1(x1 + x)  # Residual connection (x1 + x) 처리

        # FFN, dropout, Layer 정규화 처리
        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.norm2(x2 + x1)  # Residual connection (x2 + x1) 처리

        return x2

In [None]:
import torch
import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, embed_dim, ff_dim, num_heads, num_layers, vocab_size, dropout_rate=0.1):
        super(Transformer, self).__init__()

        # Embedding Layer 추가 (입력 토큰을 임베딩 벡터로 변환)
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # Stack multiple Transformer layers
        self.layers = nn.ModuleList([
            TransformerLayer(embed_dim, num_heads, ff_dim, dropout_rate) for _ in range(num_layers)
        ])

        # Output classifier: predict the token ID (output size = vocab_size)
        self.classifier = nn.Linear(embed_dim, vocab_size)  # 토큰을 예측하기 위해 vocabulary 크기로 출력

    def forward(self, x, mask=None):
        # x는 입력 시퀀스 (token IDs), 먼저 임베딩으로 변환
        x = self.embedding(x)  # 입력을 임베딩 벡터로 변환 (batch_size, seq_length, embed_dim)

        # Pass through all Transformer layers
        for layer in self.layers:
            x = layer(x, mask)  # 각 레이어를 차례로 적용

        # Classifier: 마지막 단어 예측 (vocab_size로 출력)
        logits = self.classifier(x[:, -1, :])  # 마지막 토큰의 hidden state를 사용

        return logits


In [None]:
# 손실 함수 및 옵티마이저 정의
loss_fn = nn.CrossEntropyLoss()
vocab_size = tokenizer.vocab_size
model = Transformer(embed_dim=512, ff_dim=2048, n_heads=4, n_layers=5, vocab_size=vocab_size)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)


- [ ]  학습 결과 report
    - 과제 A에서 사용한 것과 동일한 모델로 last word prediction을 학습하시면 됩니다.

In [None]:
# 학습 루프
def train_epoch(model, data_loader, loss_fn, optimizer, device):
    model.train()
    total_loss = 0
    for batch in data_loader:
        input_ids = batch[0].to(device)  # Collate function에서 입력받은 input_ids
        labels = batch[1].to(device)     # 마지막 토큰 ID

        # Forward pass
        outputs = model(input_ids)
        loss = loss_fn(outputs, labels)  # CrossEntropyLoss 사용

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    return avg_loss

# 평가 함수
def evaluate(model, data_loader, loss_fn, device):
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch[0].to(device)
            labels = batch[1].to(device)

            # Forward pass
            outputs = model(input_ids)
            loss = loss_fn(outputs, labels)
            total_loss += loss.item()

            # 예측한 라벨과 실제 라벨 비교 (logits에서 가장 높은 값 선택)
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()

    avg_loss = total_loss / len(data_loader)
    accuracy = correct / len(data_loader.dataset)
    return avg_loss, accuracy


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
epochs = 3

for epoch in range(epochs):
    print(f'Epoch {epoch+1}/{epochs}')

    # 학습
    train_loss, train_accuracy = train_epoch(model, train_loader, loss_fn, optimizer, device)
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # 평가
    test_loss, test_accuracy = evaluate(model, test_loader, loss_fn, device)
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

Epoch 1/3


ValueError: not enough values to unpack (expected 3, got 2)