<a href="https://colab.research.google.com/github/KKKKKIKKKK/BaekJoon.java/blob/master/%EA%B3%BC%EC%A0%9CBTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer 실습

In [21]:
!pip install datasets



In [22]:
!pip install sacremoses



#  IMDB 영화 리뷰 데이터셋을 로드하고 처리
- 목적:
IMDB 영화 리뷰 데이터셋을 로드하고, 이를 BERT 모델에 입력할 수 있는 형태로 전처리하는 단계
- 이는 감성 분석(긍정/부정 분류) 같은 자연어 처리 작업을 위한 준비

In [23]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


## Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.
여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.
구현은 다음과 같습니다.

In [24]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

# TransformerLayer를 구현

1. TransformerLayer 클래스 구조
2. Self-Attention 구현
3. Feed-Forward Network (FFN) 구현
4. Forward 패스
5. Mask 처리



In [25]:
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.dff = dff

    self.sa = SelfAttention(input_dim, d_model)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )

  def forward(self, x, mask):
    x = self.sa(x, mask)
    x = self.ffn(x)

    return x

## Positional encoding

$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

- max_len = 400, d_model = 256으로 설정하여 positional encoding을 생성


In [26]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


## Transformer 기반의 텍스트 분류기 모델을 구현
- 임베딩 층, 위치 인코딩, 여러 개의 Transformer 층, 그리고 분류 층으로 구성
- 초기화 (init 메소드)
- 순전파 (forward 메소드)
- 이 TextClassifier 모델은 Transformer 아키텍처를 기반으로 한 텍스트 분류기입니다. 입력 텍스트를 토큰화하고, 임베딩하며, 위치 정보를 추가한 후, 여러 Transformer 층을 통과시켜 특징을 추출

In [27]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)])
    self.classification = nn.Linear(d_model, 1)

  def forward(self, x):
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :]
    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0]
    x = self.classification(x)

    return x


model = TextClassifier(len(tokenizer), 32, 2, 32)

## 텍스트 분류 모델의 학습을 위한 준비 단계
- 설명:
이 코드는 텍스트 분류 모델의 학습을 위한 핵심 요소들을 설정합니다. Adam 옵티마이저를 사용하여 모델 파라미터를 효율적으로 업데이트하고, BCEWithLogitsLoss를 통해 이진 분류 문제에 적합한 손실을 계산합니다. 모델을 GPU로 이동시켜 학습 속도를 향상시키며, 0.001의 학습률로 학습을 진행할 준비를 합니다. 이러한 설정은 모델이 IMDB 리뷰 데이터셋에서 감성 분석(긍정/부정 분류)을 효과적으로 학습할 수 있도록 합니다.

In [28]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

## 모델의 정확도를 평가하는 함수를 정의

In [29]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

## 텍스트 분류 모델의 학습 과정을 구현

In [None]:
n_epochs = 50

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

## 과제 B. 주어진 문장에서 나올 다음 단어를 예측하는 모델 구현

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# Multi-Head Attention Module (멀티헤드 어텐션 모듈)
# 여러 개의 attention head를 사용하여 attention을 병렬로 수행하는 부분
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.embed_dim = embed_dim
        self.head_dim = embed_dim // num_heads  # 각 head의 차원 계산

        # head_dim이 num_heads로 나누어떨어지는지 확인하는 assert문
        assert self.head_dim * num_heads == embed_dim, "Embedding dimension must be divisible by number of heads"

        # query, key, value를 위한 선형 변환 정의 (Wq, Wk, Wv)
        self.q_proj = nn.Linear(embed_dim, embed_dim)
        self.k_proj = nn.Linear(embed_dim, embed_dim)
        self.v_proj = nn.Linear(embed_dim, embed_dim)

        # Output projection (출력 투영)
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        batch_size, seq_len, embed_dim = x.size()

        # Query, Key, Value 계산 (각각 q_proj, k_proj, v_proj를 사용)
        Q = self.q_proj(x)
        K = self.k_proj(x)
        V = self.v_proj(x)

        # (batch_size, num_heads, seq_len, head_dim) 형태로 변환
        Q = Q.view(batch_size, self.num_heads, seq_len, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, self.num_heads, seq_len, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, self.num_heads, seq_len, self.head_dim).transpose(1, 2)

        # Scaled Dot-Product Attention (QK^T / sqrt(d_k)) 계산
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        # 마스크가 있을 경우 마스크 적용 (masking된 위치에는 매우 작은 값으로 처리)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # 소프트맥스를 통해 어텐션 가중치 계산
        attn_weights = F.softmax(scores, dim=-1)
        context = torch.matmul(attn_weights, V)

        # Multi-head attention 결과를 다시 원래 차원으로 되돌림 (원래 형태로 변환)
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)
        return self.out_proj(context)

# Feed Forward Network Module (피드포워드 네트워크 모듈)
# 트랜스포머에서 각 attention 뒤에 나오는 2층의 피드포워드 네트워크
class FeedForwardNetwork(nn.Module):
    def __init__(self, embed_dim, hidden_dim):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(embed_dim, hidden_dim)  # 첫 번째 선형 변환
        self.fc2 = nn.Linear(hidden_dim, embed_dim)  # 두 번째 선형 변환

    def forward(self, x):
        # 활성화 함수(ReLU)를 적용 후 두 번째 선형 변환
        return self.fc2(F.relu(self.fc1(x)))

# Transformer Layer with Residual Connections, Dropout, and Layer Normalization
# 트랜스포머의 기본 층을 구현, 여기서 residual connection, dropout, layer normalization 적용
class TransformerLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout_prob=0.1):
        super(TransformerLayer, self).__init__()
        self.mha = MultiHeadAttention(embed_dim, num_heads)  # Multi-Head Attention 모듈 사용
        self.ffn = FeedForwardNetwork(embed_dim, hidden_dim)  # 피드포워드 네트워크 모듈 사용
        self.norm1 = nn.LayerNorm(embed_dim)  # 첫 번째 레이어 노멀라이제이션
        self.norm2 = nn.LayerNorm(embed_dim)  # 두 번째 레이어 노멀라이제이션
        self.dropout = nn.Dropout(dropout_prob)  # 드롭아웃 적용

    def forward(self, x, mask=None):
        # Multi-Head Attention 적용 후 Residual Connection과 Layer Normalization
        attn_output = self.mha(x, mask)
        x = self.norm1(x + self.dropout(attn_output))  # residual connection: x + attn_output

        # Feed-Forward Network 적용 후 Residual Connection과 Layer Normalization
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))  # residual connection: x + ffn_output

        return x

# Full Transformer model with 5 layers and 4 attention heads
# 트랜스포머 전체 모델을 5개의 층과 4개의 어텐션 헤드로 구성
class Transformer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, num_layers, vocab_size, dropout_prob=0.1):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)  # 입력을 임베딩하는 층
        self.layers = nn.ModuleList([
            TransformerLayer(embed_dim, num_heads, hidden_dim, dropout_prob) for _ in range(num_layers)  # 5개의 TransformerLayer로 구성
        ])
        self.fc_out = nn.Linear(embed_dim, vocab_size)  # 최종 출력 층

    def forward(self, x, mask=None):
        # 입력에 대해 임베딩 적용
        x = self.embedding(x)

        # 각 트랜스포머 층을 차례대로 통과
        for layer in self.layers:
            x = layer(x, mask)

        # 마지막에 분류기를 적용하여 단어 예측
        return self.fc_out(x)

# Example usage
if __name__ == "__main__":
    batch_size = 32
    seq_len = 20
    vocab_size = 10000
    embed_dim = 512
    num_heads = 4
    hidden_dim = 2048
    num_layers = 5

    model = Transformer(embed_dim, num_heads, hidden_dim, num_layers, vocab_size)
    input_data = torch.randint(0, vocab_size, (batch_size, seq_len))

    output = model(input_data)
    print(output.shape)  # Expected shape: [batch_size, seq_len, vocab_size]


torch.Size([32, 20, 10000])
