In [1]:
# Transformer 구조 이해 및 모델 구축
# 학습 목표
# - 1. Encoder
# - Scaled Dot-Product Attention
# - Multi-Head Attention
# - Transformer Encoder Block(Attention -> FFN -> Residual -> LayerNorm 구조)
# - Positional Encoding
# - Transformer Encoder
# - 2. Decoder
# - Masked Multi-Head Attention
# - Cross Attention

In [15]:
# 데이터 전처리 - 라벨링된 코퍼스, 라벨링된 코퍼스(긍정=1, 부정=0)
corpus = [
    ("안녕하세요 오늘은 날씨가 맑습니다", 1),   # 긍정
    ("저는 자연어 처리를 공부하고 있습니다", 1), # 긍정
    ("임베딩은 단어를 벡터로 표현하는 방법입니다", 1), # 긍정
    ("파이토치는 딥러닝을 위한 강력한 라이브러리입니다", 1), # 긍정
    ("언어 모델은 다음 단어를 예측하는 방식으로 학습합니다", 1), # 긍정
    ("작은 데이터셋으로도 실험을 시작할 수 있습니다", 1), # 긍정
    ("머신러닝은 데이터를 통해 패턴을 학습합니다", 1), # 긍정
    ("딥러닝은 인공신경망을 기반으로 합니다", 1), # 긍정
    ("토큰화는 문장을 단어 단위로 나누는 과정입니다", 1), # 긍정
    ("모델은 입력을 받아 출력을 생성합니다", 1), # 긍정
    ("하이퍼파라미터는 학습 성능에 큰 영향을 줍니다", 0), # 부정 (실험용)
    ("에포크는 전체 데이터셋을 한 번 학습하는 단위를 의미합니다", 0), # 부정
    ("배치 크기는 한 번에 처리하는 샘플 수입니다", 0), # 부정
    ("손실 함수는 모델의 예측과 정답의 차이를 측정합니다", 0), # 부정
    ("옵티마이저는 파라미터를 업데이트하는 알고리즘입니다", 0), # 부정
    ("학습률은 파라미터를 얼마나 크게 조정할지 결정합니다", 0), # 부정
    ("정규화는 과적합을 방지하는 방법입니다", 0), # 부정
    ("드롭아웃은 일부 뉴런을 무작위로 끊어 학습을 안정화합니다", 0), # 부정
    ("GPU는 대규모 연산을 빠르게 수행할 수 있습니다", 1), # 긍정
    ("실험을 반복하면 더 나은 결과를 얻을 수 있습니다", 1) # 긍정
]

In [16]:
# 데이터 전처리 - tokenizer, Dataset 생성, DataLoader 생성, -> 학습데이터 생성
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer

# AutoTokenizer 모델 로드
tokenizer = AutoTokenizer.from_pretrained("bert-base-multilingual-cased") # bert-base-multilingual-cased 모델 vocab_size 119547

# Custom Dataset 로드
class TextDataset(Dataset):
    def __init__(self, corpus, tokenizer, max_len=128):
        self.sentences = [ c[0] for c in corpus ] # 문장
        self.labels = [ c[1] for c in corpus ] # 라벨
        self.encodings = tokenizer( # AutoTokenizer 사용해 문장을 토큰화
            self.sentences,
            padding='max_length', # 문장 길이 부족하면, <PAD> 로 채운다
            truncation=True, # 문장일 길면 자른다
            max_length=max_len, # 문장 길이 128
            return_tensors='pt' # 토큰화 결과(input_ids, attention_mask)를 PyTorch 텐서로 저장
        )
    
    def __len__(self): # 전체 데이터셋의 크기를 반환
        return len(self.labels)
    
    def __getitem__(self, idx):
        # self.encodings.items() 에서 input_ids, attention_mask 추출 -> item 적재
        item = { key: val[idx] for key, val in self.encodings.items() }
        # self.labels[idx] 에서 labels 추출 -> item 적재
        item['labels'] = torch.tensor(self.labels[idx])
        return item

# Dataset
dataset = TextDataset(corpus=corpus, tokenizer=tokenizer)
# print(dataset.encodings)

# DataLoader
dataLoader = DataLoader(
    dataset=dataset,
    batch_size=4,
    shuffle=True
)

# 결과 확인
for batch in dataLoader:
    print(batch['input_ids'].shape, batch['attention_mask'].shape, batch['labels'].shape)
    # print(batch)
    break

torch.Size([4, 128]) torch.Size([4, 128]) torch.Size([4])


In [17]:
# Scaled Dot-Product Attention
# - Query ='오늘은' 
# - Attention 확률이 '날씨'와 '맑습니다'에 높게 나오면, "날씨"(0.6), "맑습니다"(0.3), "오늘은"(0.1)
# - out("오늘은") = 0.6*V("날씨") + 0.3*V("맑습니다") + 0.1*V("오늘은")
# - '오늘은'의 최종 표현 벡터는 '날씨가 맑다'라는 맥락을 반영한 새로운 벡터로 업데이트 한다
import torch.nn.functional as F

# Scaled Dot-Product Attention 함수
def scaled_dot_product_attention(Q, K, V, mask=None):
    # Query와 Key의 내적(dot product)을 통해 유사도 점수를 계산
    # - Q.shape (batch, num_heads, seq_len, d_k), K.transpose(-2, -1).shape (batch, num_heads, d_k, seq_len) -> 유사도 행렬 계산
    # - Q.size(-1) 마지막 차원 크기 d_k, **5 제곱근 연산 -> d_k로 나누어 스케일링 계산
    scores = torch.matmul(Q, K.transpose(-2, -1)) / (Q.size(-1) ** 0.5) # (batch, num_heads, seq_len, seq_len)
    if mask is not None:
        if mask.dtype == torch.bool: # True인 위치
            scores = scores.masked_fill(mask, float('-inf')) # True 인 위치를 -inf로 채워 Softmax에서 확률이 0이 되도록 한다
        else:
            scores = scores + mask # mask 값을 유사도 점수에 더해준다
    
    # 유사도 행렬을 확률 분포로 변환, Query가 Key 전체 중 어디에 집중할지 확률로 표현
    attn = F.softmax(scores, dim=-1)

    # Attention 확률(attn)을 Value(V)에 곱해 최종 컨텍스트 벡터 생성, (batch,seq_len,d_v)
    # 각 Query 토큰은 Key와의 유사도에 따라 Value 정보를 섞어서 새로운 표현을 얻게 된다
    out = torch.matmul(attn, V) # (batch, num_heads ,seq_len,d_v)
    return out, attn # out Attention 결과(컨텍스트 벡터), attn Attention 확률 분포(시각화할때 유용)

In [22]:
# Multi-Head Attention(MHA), Scaled Dot-Product Attention을 여러개의 Head로 병렬 처리하는 구조
class MultHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0 # d_model은 num_heads로 나누어 떨어져야 한다

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads # 각 head의 차원

        # Head별 Q, K, V 변환을 위한 선형 레이어
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)

        # 모든 head 결합 후 최종 출력 변환
        self.W_o = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(dropout)
    
    # Head 나누기
    def _split_heads(self, x):
        # (batch, seq_len, d_model) -> (batch, num_heads, seq_len, d_k)
        b, L, _ = x.size()
        return x.view(b, L, self.num_heads, self.d_k).transpose(1,2)
    
    # Head 결과를 합치기
    def _combine_heads(self, x):
        # (batch, num_heads, seq_len, d_k) -> (batch, seq_len, d_model)
        b, h, L, d_k = x.size()
        return x.transpose(1,2).contiguous().view(b, L, h * d_k)
        
    def forward(self, x_q, x_kv=None, mask=None):
        # Self-Attenton : x_q만 입력(x_kv=None)
        # Cross-Attention : x_q, x_kv 모두 입력
        if x_kv is None:
            # Self-Attention
            Q = self._split_heads(self.W_q(x_q))
            K = self._split_heads(self.W_k(x_q))
            V = self._split_heads(self.W_v(x_q))
        else:
            # Cross-Attention
            Q = self._split_heads(self.W_q(x_q))
            K = self._split_heads(self.W_k(x_kv))
            V = self._split_heads(self.W_k(x_kv))

        # 각 head별 Attention 수행
        context, attn = scaled_dot_product_attention(Q, K, V, mask=mask)

        # head 결합
        context = self._combine_heads(context)

        # 최종 선형 변환
        out = self.W_o(self.dropout(context))
        return out, attn

In [23]:
# Encoder Block 구조
# - Encoder Block 은 Attention -> FFN -> Residual -> LayerNorm 구조
# - 여러 Block을 쌓으면 Transformer Encoder 완성, 이 블록은 문맥을 깊게 이해하는 핵심 모듈

class TransformerEncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()

        # Multi-Head Attention, 단어 간 관계 학습 - 문맥 반영
        self.attn = MultHeadAttention(d_model, num_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        # Feed Forward Network, 각 단어 벡터 자체를 비선형 변환 - 표현력 강화
        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff), # 차원 확장, 예시) 차원 확장을 하기위해 d_model=512, d_ff=2048 
            nn.ReLU(), # 또는 GELU
            nn.Linear(d_ff, d_model) # 다시 원래 차원 축소
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask=None): # x (batch,seq_len,d_model)
        # 1. Multi-Head Attention + Residual + Norm
        attn_out, _ = self.attn(x, mask) # 입력 x를 Attention에 넣어 문맥 반영된 출력 attn_out을 얻음
        x = self.norm1(x + self.dropout1(attn_out)) # Residual 연결 x + attn_out

        # 2. Feed Forward + Residual + Norm
        ffn_out = self.ffn(x) # Attention 결과를 FFN 넣어 토큰 자체 표현 강화
        x = self.norm2(x + self.dropout2(ffn_out)) # Residual 연결 x + ffn_out

        return x

In [24]:
# Positional Encoding 테스트
import torch
import math

d_model = 8
max_len =5

# 위치 인덱스(0~4)
position = torch.arange(0, max_len).unsqueeze(1) # shape(5,1)

# 차원별 주파수 스케일
div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))

# position * div_term -> (5,4) 행렬
pos_div = position * div_term

print('position : ', position, position.shape)
print('div_term : ', div_term, div_term.shape)
print('position * div_term : ', pos_div, pos_div.shape)

position :  tensor([[0],
        [1],
        [2],
        [3],
        [4]]) torch.Size([5, 1])
div_term :  tensor([1.0000e+00, 1.0000e-01, 1.0000e-02, 1.0000e-03]) torch.Size([4])
position * div_term :  tensor([[0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [1.0000e+00, 1.0000e-01, 1.0000e-02, 1.0000e-03],
        [2.0000e+00, 2.0000e-01, 2.0000e-02, 2.0000e-03],
        [3.0000e+00, 3.0000e-01, 3.0000e-02, 3.0000e-03],
        [4.0000e+00, 4.0000e-01, 4.0000e-02, 4.0000e-03]]) torch.Size([5, 4])


In [25]:
# Encoder 구조
# - Embedding(단어->벡터) -> Positional Encoding(벡터 + 순서 정보 추가) -> Encoder Blocks(문맥 반영 + 표현 강화)
import math

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()

        pe = torch.zeros(max_len, d_model) # 예시 (5000, 16) 크기의 0의 행렬
        position = torch.arange(0, max_len).unsqueeze(1) # 0~4999 단어의 위치 인덱스 생성, (max_len, 1) 2차원 행렬 차원 추가
        # torch.arange(0, d_model, 2) 0,2,4,~,14 짝수 인덱스 추출
        # -(math.log(10000.0) 위치 인덱스를 스케일링하기 위한 상수, 10000 포지셔널 인코딩에서 사용되는 기준값
        # d_model 로 나누어 차원 크기를 맞게 조정, 즉 차원마다 다른 스케일 다른 주파수를 가지게 된다
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term) # 짝수 sin
        pe[:, 1::2] = torch.cos(position * div_term) # 홀수 cos
        pe = pe.unsqueeze(0) # 입력값 x와 shape을 맞추기 위함 (batch,seq_len,d_model)
        self.register_buffer('pe', pe) # 학습되지 않는 텐서를 저장, gradient 파라미터 업데이트가 되지 않음
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            TransformerEncoderBlock(d_model, num_heads, d_ff)
            for _ in range(num_layers) # num_layers 지정 수 만큼 반복해서 블록을 생성
        ])
    
    def forward(self, x): # 문장 -> 토큰화 입력값 x (batch, seq_len)
        x = self.embedding(x) # 토큰화 ID d_model 차원의 벡터, embedding shape (batch, seq_len, d_model)
        x = self.pos_encoding(x) # 벡터 + 위치정보(sin/cos 패턴), pos_encoding shape (batch, seq_len, d_model)
        for layer in self.layers: # Multi-Head Attention 문맥 반영, Feed Forward Network 표현 강환, Residual + LayerNorm 안정적 학습
            x = layer(x) # TransformerEncoderBlock shape (batch, seq_len, d_model)
        return x

# 작은 입력 (batch=1, seq_len=5)
tokens = torch.tensor([[1,2,3,4,5]])
encoder = TransformerEncoder(vocab_size=100, d_model=16, num_heads=4, d_ff=32, num_layers=2)
output = encoder(tokens)
print(output.shape)

torch.Size([1, 5, 16])


In [26]:
# Masked Multi-Head Attention
# - Decoder Attention, 미래 토큰을 보면 안됨, 마스크(mask) 로 가림, 예시) [I, love, ___] -> 다음 단어 예측시 뒤 단어는 가려져야 함
# - 구현에서는 Upper-traiangular mask를 만들어서 현재 시점 이후의 값들을 -inf로 처리 -> softmax에서 0으로 됨
# - Decoder는 왼쪽만 보고 오른쪽을 예측하는 구조이다
class MaskedMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1): # d_model 입력 벡터 차원, num_heads 어텐션 헤드 개수, dropout 드롭아웃 비율
        super().__init__()
        self.attn = MultHeadAttention(d_model, num_heads, dropout)
    
    def forward(self, x):
        # 입력값 x (batch,seq_len,d_model)
        seq_len = x.size(1) # seq_len 의 문장 길이

        # 마스크 생성(상삼각 행렬), 대각선 위로만 1 나머지는 0, bool() True/False 형태로 반환
        # 예시) mask = False 볼 수 있는 부분(현재/과거 토큰), True 가려지는 부분(미래 토큰)
        # [[False  True  True  True  True ], - 1번째 토큰은 자기 자신만 보고, 나머지 2~5는 모두 가려짐
        # [False False  True  True  True ], - 2번째 토큰은 자기 자신 + 1번째 토큰만 보고, 3~5는 가려짐
        # [False False False  True  True ], - 3번째 토큰은 자기 자신 + 1,2번째 토큰만 보고, 4~5는 가려짐
        # [False False False False  True ], - 4번째 토큰은 자기 자신 + 1,2,3번째 토큰만 보고, 5는 가려짐
        # [False False False False False]]
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()

        # mask : (seq_len, seq_len), True = 가려질 부분
        attn_out, _ = self.attn(x, mask=mask)
        return attn_out

In [None]:
# Cross Attention
# - Decoder는 자기 자신 토큰들끼리만 참고 한다, 미래 토큰은 마스크 처리되어 볼 수 없기 때문에, 독자적으로는 전체 의미를 완전히 알 수 없다
# - Cross Attention은 Decoder가 Encoder 출력값(Key/Value)를 참고 한다, 이렇게 해야 Decoder가 입력 문장의 의미를 반영 할 수 있다
# - Q = Decoder 상태, K/V = Encoder 출력
# - 즉, Decoder 혼자서는 불완전한 정보만 갖고 있으므로, Encoder의 의미 벡터를 참고해서 문장을 생성한다

class CrossAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        self.mha = MultHeadAttention(d_model, num_heads, dropout)

    def forward(self, x_dec, enc_out, mask=None):
        # x_dec는 Decoder에서, enc_out는 Encoder에서 가져옴
        out, attn = self.mha(x_dec, x_kv=enc_out, mask=mask)
        return out, attn