## Self-Attention 구현

하나의 시퀀스 내에서 서로 다른 위치들 간의 관계를 계산하여 시퀀스의 표현을 만들어내는 매커니즘으로, 'Scaled Dot-Product Attention' 이라고 부른다.

### 수식

$$Attention(Q, K, V) = softmax(QK^T / √d_k)V$$

### 동작 원리:
- Q(Query) 와 K(Key)의 내적으로 유사도 계산
- $√d_k$로 스케일링
- softmax로 attention 가중치 계산
- V(Value)에 가중치를 내적해 weighted sum

### + 마스크드 어텐션(masked attention)

마스크드 어텐션(masked attention)이란, self-attention 계산 과정에서 특정 토큰이 아예 선택되지 않도록 attention score 단계에서 차단하는 메커니즘을 의미한다. 구체적으로, 문장 길이를 맞추기 위해 추가된 padding 토큰이나, 디코더에서 아직 생성되지 않은 미래 토큰에 대해 어텐션 점수(QKᵀ)에 매우 작은 값(−∞에 해당하는 값)을 더해 softmax 이전에 제거함으로써, softmax 이후 해당 위치의 가중치가 정확히 0이 되도록 한다.

이 방식은 단순히 출력이나 pooling 단계에서 토큰을 무시하는 것과 달리, 문맥 정보가 섞이는 단계 자체를 원천적으로 차단한다는 점에서 중요하며, 인코더에서는 padding mask로 불필요한 토큰을 배제하고, 디코더에서는 여기에 causal mask를 추가해 미래 정보 누설을 방지함으로써 Transformer가 올바른 문맥과 순서 정보를 학습하도록 한다.

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [3]:
class SelfAttention(nn.Module):
  def __init__(self, embed_dim):
    super().__init__()
    self.embed_dim = embed_dim

    # Q, K, V 변환 행렬
    # 입력된 단어 벡터 x를 각각 Q, K, V라는 목적에 맞는
    # 새로운 벡터로 선형 투영(Linear Projection)하는 역할을 한다.
    self.query = nn.Linear(embed_dim, embed_dim)
    self.key = nn.Linear(embed_dim, embed_dim)
    self.value = nn.Linear(embed_dim, embed_dim)

    # Scaling factor
    # 내적 값이 너무 커져서 기울기 소실되는 것을 막기 위함
    self.scale = math.sqrt(embed_dim)

  def forward(self, x, mask=None):
    # 동일한 x로부터 Q, K, V 생성
    Q = self.query(x)
    K = self.key(x)
    V = self.value(x)

    # Q 행렬과 K 전치 행렬의 내적을 구하고
    # 스케일링 인자로 나눠
    # Attention Scores 계산
    scores = torch.matmul(Q, K.transpose(-2, -1) / self.scale)

    # Masking (옵션)
    if mask is not None:
      # mask가 0인 위치를 -inf로 설정 (softmax 후 0이 됨)
      scores = scores.masked_fill(mask == 0, float('-inf'))

    # Softmax로 Attention 가중치 계산
    # dim = -1, 질문의 받은 각 키들의 가중치
    # 한 단어(Q)가 문장 내 전체 단어들(K)에 대해 가지는 가중치의 합을 1로 맞추기 위한 것
    attention_weights = F.softmax(scores, dim=-1)

    # Value에 가중치 적용
    output = torch.matmul(attention_weights, V)

    return output, attention_weights

## Multi-Head Attention 구현

Self-Attention을 병렬로 여러 번 수행하는 구조이다. 모델이 서로 다른 위치에 있는 **다양한 표현 하위 공간(representation subspaces)으로부터 공동으로 정보를 참조** 할 수 있게 하며, 단일 어텐션에서 정보가 평균화되어 세밀한 특징이 억제되는 것을 방지한다. 마치 여러 명의 전문가가 각자 다른 관점에서 문장을 분석하는 것과 유사하다.

### 수식

$$
\begin{align*}
\operatorname{MultiHead}(Q, K, V) &= \operatorname{Concat}(\text{head}_1, \dots, \text{head}_h) W^O \\
\text{where } \text{head}_i &= \operatorname{Attention}(QW_i^Q, KW_i^K, VW_i^V)
\end{align*}
$$

### 핵심 아이디어

- 여러 개의 attention head를 병렬로 수행
- 각 head는 다른 representation subspace에서 정보 추출
- 예: head1은 문법, head2는 의미, head3은 위치 관계 등

In [5]:
class MultiHeadAttention(nn.Module):
  def __init__(self, embed_dim, num_heads):
    super().__init__()
    # assert [조건], [에러 메시지]
    assert embed_dim % num_heads == 0, "embed_dim must be divisible by num_heads"

    self.embed_dim = embed_dim
    self.num_heads = num_heads
    self.head_dim = embed_dim // num_heads

    # Q, K, V를 각각 리니어 층을 만들지 않고, 한 번에 처리
    # where head_i 부분
    self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)

    # Output projection
    # 출력 할 때는 다시 정확히 embed_dim 크기로 내보냄
    # W^O 부분
    self.out_proj = nn.Linear(embed_dim, embed_dim)

    self.sale = math.sqrt(self.head_dim)

  def forward(self, x, mask=None):
    batch_size, seq_len, embed_dim = x.size()

    # Q, K, V 계산 및 reshape
    qkv = self.qkv_proj(x)
    # 3: Q, K, V로 구분
    # num_heads: 멀리 헤드로 나누기 위함
    # head_dim: 각 헤드의 실제 차원, -1로 대체 가능
    qkv = qkv.reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim)
    # 원래 순서 (0:batch, 1:seq, 2:qkv, 3:heads, 4:dim)
    # 바뀐 순서 (2:qkv, 0:batch, 3:heads, 1:seq, 4:dim)
    # 병렬 연산 최적화를 하기 위해 가장 앞차원에 Q, K, V를 빼냄
    # 그 다음 batch와 heads를 앞으로 보냄으로써, 각 배치별, 각헤드별 어텐션 연산(행렬곱)을 하기 위함
    qkv = qkv.permute(2, 0, 3, 1, 4)
    Q, K, V = qkv[0], qkv[1], qkv[2]

    # Scaled Dot-Product Attention
    scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
    # scores: (batch_size, num_heads, seq_len, seq_len)

    # Masking
    if mask is not None:
      if mask.dim() == 3:
        # 차원을 늘려 차원의 개수를 맞춰줌
        mask = mask.unsqueeze(1) # (batch_size, 1, seq_len, seq_len)
      scores = scores.masked_fill(mask == 0, float('-inf'))

    # Softmax
    attention_weights = F.softmax(scores, dim=-1)

    # Attention 적용
    attn_output = torch.matmul(attention_weights, V)
    # attn_output: (batch_size, num_heads, seq_len, head_dim)

    # Heads 결합
    # 헤드별이 아니라, 단어별로 헤드 정보가 모아져야 하기에 transpose 사용
    # contiguous(): 바뀐 모양에 맞춰서 메모리의 데이터 배치 자체를 새로 정렬하여 복사본을 만듦
    attn_output = attn_output.transpose(1, 2).contiguous()
    # attn_ouput: (batch_size, seq_len, num_heads, head_dim)
    # Concat(head_1,...,head_n) 부분
    # embed_dim = num_heads * head_dim
    attn_output = attn_output.reshape(batch_size, seq_len, embed_dim)

    # Output projection
    # output = nn.Linear(embed_dim, embed_dim)(attn_output)
    output = self.out_proj(attn_output)

    return output, attention_weights

## Positional Encoding 구현

Transformer는 순환 구조가 없기 때문에 발생하는 **순서 정보의 부재** 를 해결하기 위한 기술이다. Positional Encoding은 각 토큰의 위치 정보를 사인(sine)과 코사인(cosine) 함수를 사용하여 토큰의 상대적 또는 절대적 위치 정보를 입력 임베딩에 주입한다.

### 수식

$$PE_{(pos, 2i)} = \sin(pos / 10000^{2i/d_{model}})$$$$PE_{(pos, 2i+1)} = \cos(pos / 10000^{2i/d_{model}})$$

### 왜 sin/cos?

- 상대적 위치를 선형 변환으로 표현 가능
  PE(pos + k) = f(PE(pos), k)
- 임의의 긴 시퀀스에도 일반화
- 각 차원마다 다른 주기 -> 고유한 위치 표현

In [6]:
def __init__(self, embed_dim, max_len=5000, dropout=0.1):
  super().__init__()
  self.dropout = nn.Dropout(p=dropout)

  # Positional encoding 미리 계산
  # 빈 공간 확보
  pe = torch.zeros(max_len, embed_dim)
  # 위치 정보를 계산하기 위한 기본 숫자 열 생성
  # torch.arange(0, max_len, dtype=torch.float)
  # 0 ~ max_len - 1 까지의 정수를 실수 형태로 생성한다.
  # unsqueeze(1)로 차원 하나를 추가
  # 전: [0., 1., 2., 3., 4.]
  # 후: [[0.],
  #      [1.],
  #      [2.],
  #      [3.],
  #      [4.]]
  position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

  div_term = torch.exp(
      torch.arange(0, embed_dim, 2).float() *
      (-math.log(10000.0) / embed_dim)
  )

  # 짝수 인덱스: sin
  pe[:, 0::2] = torch.sin(position * div_term)
  # 홀수 인덱스: cos
  pe[:, 1::2] = torch.cos(position * div_term)

  # batch 차원 추가
  # 1: batch_size
  pe = pe.unsqueeze(0) # (1, max_len, embed_dim)

  # 학습되지 않은 파라미터로 등록
  # register_buffer(): 이 값은 학습에 필요는 없지만, 모델의 고정된 참고서
  self.register_buffer('pe', pe)

def forward(self, x):
  # x에 positional encoding 더하기
  x = x + self.pe[:, :x.size(1), :]
  return self.dropout(x)