<a href="https://colab.research.google.com/github/byeongdon/hanghae99/blob/main/2%EC%A3%BC%EC%B0%A8_%EC%8B%AC%ED%99%94%EA%B3%BC%EC%A0%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Multi-head Attention으로 감정 분석 모델 구현하기**

🔍 Multi-head Attention이란?

Multi-head Attention은 Self-Attention 메커니즘을 여러 번 병렬로 수행하는 방식입니다. 간단히 아래 개념으로 이해할 수 있어요:

Self-Attention은 단어 하나가 문장의 다른 단어들과 얼마나 관련 있는지를 파악합니다.

이때 단 하나의 attention만 사용하면, 정보의 일부분만 포착할 수 있어요.

그래서 여러 개의 attention "head"를 사용해 각기 다른 관점에서 정보를 파악합니다.

🧠 왜 여러 개의 head가 필요할까?

하나의 head만 사용할 경우, 문장의 의미나 문맥을 단순하게 해석할 수밖에 없어요.

여러 head를 사용하면 문법적 관계, 의미적 유사성, 위치 정보 등 다양한 정보를 병렬로 파악할 수 있습니다.

이렇게 얻은 다양한 정보를 하나로 합쳐서 더 풍부하고 정밀한 문장 표현을 만들어냅니다.

📘 비유로 이해해볼까요?

"책을 읽을 때, 각 사람은 다른 관점에서 내용을 이해하죠."

어떤 사람은 줄거리에 집중하고,

어떤 사람은 문장 표현에 주목하고,

또 어떤 사람은 등장인물 간의 관계에 집중합니다.

👉 Multi-head Attention은 이런 식으로 여러 사람이 동시에 문장을 다른 관점에서 이해하는 것과 같아요.

In [22]:
!pip install datasets sacremoses



In [23]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


# ds = load_dataset("stanfordnlp/imdb")
train_ds = load_dataset("stanfordnlp/imdb", split="train[:5%]")
test_ds = load_dataset("stanfordnlp/imdb", split="test[:5%]")

tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


Multi-Head-Attention으로 구현하기 위해서 SelfAttention을 여러 개 구현

split_heads 함수는 Multi-head attention에서 입력 텐서를 여러 개의 head로 분할하는 역할을 합니다.

함수 설명:

입력:

x: 입력 텐서. shape은 (batch_size, sequence_length, d_model) 입니다.
batch_size: 배치 크기.
기능:

입력 텐서 x를 num_heads 개의 head로 분할합니다.
분할된 각 head는 depth 차원을 갖습니다. (depth = d_model / num_heads)
분할 후, 텐서의 shape을 (batch_size, num_heads, sequence_length, depth)로 변경합니다.
출력:

분할되고 shape이 변경된 텐서.
코드 분석:


x = x.view(batch_size, -1, self.num_heads, self.depth)
Use code with caution
view 함수를 사용하여 입력 텐서 x의 shape을 변경합니다.
batch_size는 그대로 유지됩니다.
-1은 sequence_length를 자동으로 계산하도록 지정합니다.
self.num_heads는 head의 개수입니다.
self.depth는 각 head의 depth입니다.

return x.permute(0, 2, 1, 3)
Use code with caution
permute 함수를 사용하여 텐서의 차원 순서를 변경합니다.
(batch_size, sequence_length, num_heads, depth)에서 (batch_size, num_heads, sequence_length, depth)로 변경됩니다.
요약:

split_heads 함수는 Multi-head attention에서 입력 텐서를 여러 head로 분할하고, 각 head가 독립적으로 attention을 계산할 수 있도록 shape을 변경하는 역할을 합니다. 이를 통해 모델은 입력 시퀀스의 여러 부분에 동시에 attention을 줄 수 있습니다.

In [24]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

def scales_dot_product_attention(q, k, v, mask):
  """Scaled Dot-Product Attention 계산 함수"""
  matmul_qk = torch.matmul(q, k.transpose(-2, -1))  # (..., seq_len_q, seq_len_k)

  # 스케일링
  dk = torch.tensor(k.size(-1), dtype=torch.float32)
  scaled_attention_logits = matmul_qk / torch.math.sqrt(dk)

  # 마스크 적용 (옵션)
  if mask is not None:
    scaled_attention_logits += (mask * -1e9)

  # 소프트맥스 함수를 사용하여 attention 가중치 계산
  attention_weights = torch.nn.functional.softmax(scaled_attention_logits, dim=-1)
                                                                # (..., seq_len_q, seq_len_k)
  output = torch.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

  return output, attention_weights

class MultiHeadAttention(nn.Module): # MHA 구현을 위해 SelfAttention 코드 복사
  def __init__(self, input_dim, d_model, num_heads):
    super().__init__()

    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0 # d_model이 num_heads로 나누어떨어지는지 확인

    self.depth = d_model // self.num_heads  # 각 head의 depth 계산

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)
  def split_heads(self, x, batch_size):
      """입력 텐서를 여러 head로 분할"""
      x = x.view(batch_size, -1, self.num_heads, self.depth)
      return x.permute(0, 2, 1, 3)

  def forward(self, q, k, v, mask): # attention을 계산하고 출력을 반환
    batch_size = q.size(0)

    q, k, v = self.wq(q), self.wk(k), self.wv(v)
    q, k, v = self.split_heads(q, batch_size), self.split_heads(k, batch_size), self.split_heads(v, batch_size)

    scaled_attention, attention_weights = scales_dot_product_attention(q, k, v, mask)

    scaled_attention = scaled_attention.permute(0, 2, 1, 3)  # (batch_size, seq_len_q, num_heads, depth)

    concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model)  # (batch_size, seq_len_q, d_model)

    output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

    return output, attention_weights

In [25]:
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff, num_heads): # num_heads 추가
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff

        self.mha = MultiHeadAttention(input_dim, d_model, num_heads) # MHA로 변경
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model)
        )

        self.dropout1 = nn.Dropout(0.1) # Dropout 추가
        self.dropout2 = nn.Dropout(0.1) # Dropout 추가
        self.layernorm1 = nn.LayerNorm(d_model) # Layer Normalization 추가
        self.layernorm2 = nn.LayerNorm(d_model) # Layer Normalization 추가

    def forward(self, x, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # MHA 호출, Q, K, V는 모두 x 사용
        attn_output = self.dropout1(attn_output) # Dropout 적용
        out1 = self.layernorm1(x + attn_output)  # Residual connection + Layer Normalization

        ffn_output = self.ffn(out1)  # FFN 적용
        ffn_output = self.dropout2(ffn_output) # Dropout 적용
        out2 = self.layernorm2(out1 + ffn_output)  # Residual connection + Layer Normalization

        return out2

In [26]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


In [27]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff, num_heads):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff, num_heads) for _ in range(n_layers)])
    self.classification = nn.Linear(d_model, 1)

  def forward(self, x):
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :]
    seq_len = x.shape[1]

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0]
    x = self.classification(x)

    return x


model = TextClassifier(len(tokenizer), 32, 5, 32, 4)

In [28]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [29]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [30]:
n_epochs = 5

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

RuntimeError: The size of tensor a (4) must match the size of tensor b (64) at non-singleton dimension 1