## 스케일드 점곱 어텐션

### 스케일드 점곱 계산 절차
- X = Query * Key (N x N 행렬 생성)
- X = softmax(X / sqrt(d(model) / N(head)))
- X_output = X * Value

In [168]:
from transformers import AutoTokenizer

In [169]:
model_ckpt = 'bert-base-uncased'
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

In [170]:
text = 'time flies like an arrow'

In [171]:
# tokenize
inputs = tokenizer(text, return_tensors='pt', add_special_tokens=False)

In [172]:
from torch import nn
from transformers import AutoConfig

In [173]:
# 토큰 임베딩
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)

In [174]:
input_embs = token_emb(inputs.input_ids)
input_embs.size()

torch.Size([1, 5, 768])

In [175]:
import torch
from math import sqrt

query = key = value = input_embs
dim_k = key.size(-1)

# 배치 행렬곱을 통해 어텐션 점수 출력 (hidden_dim 의 제곱근만큼 크기로 정규화 진행)
scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

In [176]:
# scores에 softmax 적용
import torch.nn.functional as F

weights = F.softmax(scores, dim = -1)
weights.sum(dim = -1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [177]:
# Key와 행렬곱 하여 점곱 어텐션 출력값을 생성
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

## Scaled dot product Attention 과정을 하나의 함수로 표현

In [179]:
import torch
import torch.nn.functional as F

def scaled_dot_product_attention(query: torch.Tensor, key: torch.Tensor, value: torch.Tensor) -> torch.Tensor:
    hidden_dim_size = query.size(-1)

    # (Q * K_T) / sqrt(hidden_dim_size)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(hidden_dim_size)
    # Softmax
    weights = F.softmax(scores, dim = -1)
    # output = weights * value
    return torch.bmm(weights, value)

## Multi-Head Attention

- 이전 Scaled-dot Product Attention 계산 절차와 다르게, Query, Key, Value 각각을 선형변환한 채로 Attention에 넣는 것이 차이
- 이러한 선형변환은 여러벌로 구성되는데, 각 벌마다 집중하는 문맥이 다르기 때문에, 예측의 풍부함을 부여하는 효과를 불러옴

In [181]:
# Attention Head 구현
import torch
from torch import nn

class AttentionHead(nn.Module):
    def __init__(self, embed_dim: int, head_dim: int):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    # (batch_size, seq_len, head_dim) 사이즈의 텐서를 리턴
    def forward(self, hidden_state: torch.Tensor) -> torch.Tensor:
        query_output, key_output, value_output = self.q(hidden_state), self.k(hidden_state), self.v(hidden_state)
        return scaled_dot_product_attention(query=query_output, key=key_output, value=value_output)

In [182]:
# Multi-head Attention 구현
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim, num_heads = config.hidden_size, config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state: torch.Tensor) -> torch.Tensor:
        x = torch.cat([attention_head(hidden_state) for attention_head in self.heads], dim = -1)
        x = self.output_linear(x)
        return x

In [183]:
multi_head_attn = MultiHeadAttention(config)
attn_output = multi_head_attn(input_embs)
attn_output.size()

torch.Size([1, 5, 768])

## FeedForward Layer 구현

In [185]:
# 피드포워드 층 구현
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    # (batch_size, seq_len, hidden_state) 크기 그대로 feed_forward 시켜서 반환
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [186]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

## Transformer Encoder Layer 구현

In [188]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 사전 층 정규화를적용하고 입력을 쿼리, 키, 값 으로 복사합니다
        hidden_state = self.layer_norm_1(x)
        # 어텐션에 Gradient Highway 적용
        x = x + self.attention(hidden_state)
        # Gradient Highway와 FeedForward 적용
        hidden_state = self.feed_forward(self.layer_norm_2(x))
        return x + hidden_state

In [189]:
# 테스트
encoder_layer = TransformerEncoderLayer(config)
input_embs.shape, encoder_layer(input_embs).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

## Transformer Token Embedding Layer 구현
- 토큰 임베딩과 위치 임베딩을 더하여 Transformer 임베딩 계층 구현

In [208]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.positional_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
        # 입력 시퀀스에 대해서 위치 ID를 만듭니다
        seq_length = input_ids.size(1)
        positional_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # 토큰 임베딩과 위치 임베딩을 만듭니다
        token_embeddings = self.token_embeddings(input_ids)
        positional_embeddings = self.positional_embeddings(positional_ids)
        # 토큰 임베딩과 위치 임베딩을 합칩니다 (Add/Norm + Dropout)
        embeddings = token_embeddings + positional_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [218]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

## 트랜스포머 인코더 생성
- 임베딩 계층과 인코더 스택을 결합하여 트랜스포머 인코더를 생성

In [223]:
class TransformerEncoder(nn.Module):
    def __init__(self, config: dict):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)])

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [225]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

## 인코더 단독으로 사용하는 경우 분류 헤드 추가
- BERT 모델과 같은 텍스트 분류 문제는 인코더에 분류 헤드를 붙이는 형식으로 사용함

In [228]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config: dict):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # [CLS] 토큰의 은닉 상태를 선택
        x = self.encoder(x)[:, 0, :]
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [230]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])