# Transformer 실습


In [1]:
!pip install datasets sacremoses

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl.metadata (8.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 k

In [5]:
from datasets import load_dataset
from torch.utils.data import DataLoader
import torch

# # 데이터 로드 (5%만 사용)
# train_ds = load_dataset("stanfordnlp/imdb", split="train[:5%]")
# test_ds = load_dataset("stanfordnlp/imdb", split="test[:5%]")
# 데이터 로드
train_ds = load_dataset("stanfordnlp/imdb", split="train")
test_ds = load_dataset("stanfordnlp/imdb", split="test")

# BERT tokenizer 불러오기
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')

# Collate 함수 정의
def collate_fn(batch):
    max_len = 400
    texts, labels = [], []
    for row in batch:
        texts.append(row['text'])
        labels.append(row['label'])
    texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
    labels = torch.LongTensor(labels)
    return texts, labels

# DataLoader 생성
train_loader = DataLoader(
    train_ds,
    batch_size=64,
    shuffle=True,
    collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds,
    batch_size=64,
    shuffle=False,
    collate_fn=collate_fn
)


Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


In [6]:
import torch.nn as nn
import numpy as np
from math import sqrt

# Positional Encoding
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]
    return torch.FloatTensor(pos_encoding)

# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, n_heads):
        super().__init__()

        # ✅ Multi-Head 설정
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
        self.n_heads = n_heads
        self.d_head = d_model // n_heads  # ✅ 각 head의 차원

        # 기존 Self-Attention과 동일한 q, k, v projection
        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)

        self.wo = nn.Linear(d_model, d_model)  # ✅ head concat 후 projection
        self.softmax = nn.Softmax(dim=-1)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x, mask):
        B, S, _ = x.size()  # B: batch size, S: seq len

        # (B, S, D) → (B, H, S, D')로 변환
        q = self.wq(x).view(B, S, self.n_heads, self.d_head).transpose(1, 2)  # (B, H, S, D')
        k = self.wk(x).view(B, S, self.n_heads, self.d_head).transpose(1, 2)
        v = self.wv(x).view(B, S, self.n_heads, self.d_head).transpose(1, 2)

        # Self-Attention과 동일한 방식으로 attention score 계산 (각 head별로 병렬)
        score = torch.matmul(q, k.transpose(-1, -2)) / sqrt(self.d_head)  # (B, H, S, S)

        # Mask shape도 (B, 1, S, S)로 맞춰줘야 head 차원과 브로드캐스팅 가능
        if mask is not None:
            mask = mask[:, None, :, :]  # (B, 1, S, S)
            score = score.masked_fill(mask, -1e9)

        # Softmax + Dropout
        attn = self.dropout(self.softmax(score))

        # attention 결과 계산: (B, H, S, D')
        out = torch.matmul(attn, v)

        # 여러 head를 다시 하나로 합치기: (B, H, S, D') → (B, S, D)
        out = out.transpose(1, 2).contiguous().view(B, S, self.n_heads * self.d_head)

        # 마지막 projection
        return self.wo(out)


# Transformer Layer
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff, n_heads):
        super().__init__()
        self.mha = MultiHeadAttention(input_dim, d_model, n_heads)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(0.1)
        self.dropout2 = nn.Dropout(0.1)

    def forward(self, x, mask):
        x1 = self.norm1(x + self.dropout1(self.mha(x, mask)))
        x2 = self.norm2(x1 + self.dropout2(self.ffn(x1)))
        return x2

# 전체 모델
class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(400, d_model), requires_grad=False)
        self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)])
        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        mask = (x == tokenizer.pad_token_id)
        mask = mask[:, None, :].expand(-1, x.size(1), -1)  # (B, S, S)
        seq_len = x.size(1)
        x = self.embedding(x) * sqrt(self.embedding.embedding_dim)
        x = x + self.pos_encoding[:, :seq_len]

        for layer in self.layers:
            x = layer(x, mask)

        return self.classification(x[:, 0])


In [7]:
from torch.optim import Adam

model = TextClassifier(len(tokenizer), d_model=128, n_layers=5, dff=256, n_heads=4).to('cuda')
optimizer = Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()

# Accuracy 측정 함수
def accuracy(model, dataloader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to('cuda'), y.to('cuda')
            preds = (model(x) > 0).long().squeeze()
            correct += (preds == y).sum().item()
            total += y.size(0)
    return correct / total

# 학습 loop
n_epochs = 5

for epoch in range(n_epochs):
    model.train()
    total_loss = 0
    for x, y in train_loader:
        x, y = x.to('cuda'), y.float().to('cuda')
        model.zero_grad()
        preds = model(x).squeeze()
        loss = loss_fn(preds, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch} | Train Loss: {total_loss:.4f} | Train Acc: {accuracy(model, train_loader):.3f} | Test Acc: {accuracy(model, test_loader):.3f}")


Epoch 0 | Train Loss: 210.5462 | Train Acc: 0.811 | Test Acc: 0.789
Epoch 1 | Train Loss: 154.6481 | Train Acc: 0.854 | Test Acc: 0.819
Epoch 2 | Train Loss: 137.3433 | Train Acc: 0.885 | Test Acc: 0.840
Epoch 3 | Train Loss: 125.2467 | Train Acc: 0.850 | Test Acc: 0.814
Epoch 4 | Train Loss: 118.3788 | Train Acc: 0.902 | Test Acc: 0.839
