In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score
import warnings
warnings.filterwarnings('ignore')

# **1. Positional Encoding (위치 정보 인코딩)**

## 사인·코사인 방식의 위치 인코딩

#### **개념**:
  - 트랜스포머는 시퀀스의 순서 정보를 학습하지 못하므로 위치 정보를 추가로 제공
  - PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
  - PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))

In [12]:
import torch
import torch.nn as nn
import math # numpy 대신 math나 torch 사용 권장

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len=512):
        super().__init__()
        self.d_model = d_model

        # 위치 인코딩 행렬 생성 (최대시퀀스길이, 차원)
        pe = torch.zeros(max_seq_len, d_model)

        # 위치 인덱스
        pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)

        # 주파수 계산: 10000^(2i/d_model)
        # np.log 대신 torch.log를 사용하거나 math.log를 사용합니다.
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # 짝수 인덱스: sin
        pe[:, 0::2] = torch.sin(pos * div_term)

        # 홀수 인덱스: cos
        if d_model % 2 == 1:
            pe[:, 1::2] = torch.cos(pos * div_term[:-1])
        else:
            pe[:, 1::2] = torch.cos(pos * div_term)

        # 버퍼로 등록 (배치 차원 추가: 1, max_seq_len, d_model)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        """
        이 함수는 반드시 __init__과 세로 줄이 맞춰져 있어야 합니다.
        x: (batch_size, seq_len, d_model)
        """
        # x의 길이에 맞춰서 위치 정보를 더해줌
        return x + self.pe[:, :x.size(1), :]

# **2. Self-Attention**

## 단일 헤드의 Self-Attention 메커니즘
    
  #### **개념**:
  - **Query(Q)**: 각 토큰이 어떤 토큰을 봐야 하는지 나타냄
  - **Key(K)**: 각 토큰이 찾아질 수 있는 특성을 나타냄
  - **Value(V)**: 실제 정보를 담음

#### **수식:**
<img src='https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSRyzcV771Etb3fBSPSFzW_hBZT1H_h9DvgrQ&s' height=120>

In [4]:
class SelfAttention(nn.Module):
  def __init__(self, d_model, d_k=None):
    super().__init__()
    # d_model: 임베딩 차원 (예: 256)
    self.d_model = d_model

    # d_k: 각 헤드의 차원 (명시하지 않으면 d_model과 같음)
    self.d_k = d_k if d_k is not None else d_model

    # Q, K, V 선형 변환
    # 입력 d_model 차원 → 출력 d_k 차원
    # 예: (배치, 시퀀스, 256) → (배치, 시퀀스, 64)
    self.linear_q = nn.Linear(d_model, self.d_k)
    self.linear_k = nn.Linear(d_model, self.d_k)
    self.linear_v = nn.Linear(d_model, self.d_k)

    # QK^T을 d_k로 나눠서 너무 크지 않도록 정규화
    # softmax가 극단값(0 또는 1)이 되지 않도록 방지
    self.scale = np.sqrt(self.d_k)

  def forward(self, query, key, value, mask=None):
    # 1. 선형변환으로 Q, K, V 생성
    # 예: (32, 20, 256) → (32, 20, 64)
    Q = self.linear_q(query)  # Query: 무엇을 찾을 것인가
    K = self.linear_k(key)    # Key: 여기 찾을 수 있는 특성이 있나
    V = self.linear_v(value)  # Value: 실제 정보

    # 2. QK^T 계산 및 스케일링
    # K.transpose(-2, -1): 마지막 2개 차원을 바꿈
    # (32, 20, 64) → (32, 64, 20)
    # 결과: 각 위치가 다른 모든 위치와 얼마나 관련있는지 점수화
    scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
    # scores: (batch_size, seq_len, seq_len)

    # 3. 마스크 적용 (선택사항)
    # 디코더에서 미래 토큰 보지 않게 하려고 사용
    # mask가 0인 곳을 -inf로 채워서 softmax에서 0이 되게 함
    if mask is not None:
      # masked_fill: 조건에 맞는 곳을 특정 값으로 채우기
      # mask == 0인 곳을 -inf로 채움
      scores = scores.masked_fill(mask == 0, float('-inf'))

    # 4. Softmax로 주의 가중치 계산
    # dim=-1: 마지막 차원(각 위치별로)에 대해 softmax
    attention_weights = torch.softmax(scores, dim=-1)
    attention_weights = torch.nan_to_num(attention_weights, 0.0)

    # 5. Value와 곱해서 최종 출력
    attention_output = torch.matmul(attention_weights, V)
    # attention_output: (batch_size, seq_len, d_k)

    return attention_output, attention_weights

# **3. Multi-Head Attention**

## 여러 개의 Self-Attention을 병렬로 실행
    
  #### **개념**:
  - 1개 헤드: 모든 정보를 256차원에 집중
  - 8개 헤드: 256차원을 8개 그룹(각 32차원)으로 나눠서
  각각 다른 "관점"에서 주목
    
  #### **예시**:
  "I like apples"을 분석할 때:
  - 헤드1: 문법 구조에 집중 (I=주어, like=동사)
  - 헤드2: 감정에 집중 (like=긍정, apples=좋은 것)
  - 헤드3: 의존성에 집중 (like←I, apples←like)


> 이렇게 다양한 관점의 정보를 모아서 더 풍부한 표현 생성

In [5]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads=8):
    super().__init__()

    # d_model이 num_heads로 나누어떨어지는지 확인
    # 예: d_model=256, num_heads=8 → 256/8=32 (OK)
    # 예: d_model=256, num_heads=7 → 256/7=36.57 (오류)
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    self.d_model = d_model  # 전체 임베딩 차원 (256)
    self.num_heads = num_heads  # 헤드 개수 (8)
    # 각 헤드의 차원: 256 / 8 = 32
    self.d_k = d_model // num_heads

    # num_heads개의 Self-Attention 레이어 생성
    # nn.ModuleList: 여러 모듈을 리스트로 관리
    # 각 헤드는 d_model 입력을 d_k 출력으로 변환
    # 예: 8개의 SelfAttention(256, 32) 생성
    self.attention_heads = nn.ModuleList(
        [SelfAttention(d_model, self.d_k) for _ in range(num_heads)]
    )

    # 모든 헤드의 출력을 연결한 후 선형변환
    # 입력: concat된 모든 헤드 출력 (d_k * num_heads = d_model)
    # 출력: d_model
    # 예: (32, 20, 256) → (32, 20, 256) (형태는 같지만 가중치 재학습)
    self.linear_out = nn.Linear(d_model, d_model)

  def forward(self, query, key, value, mask=None):
    # 1. 각 헤드별로 Self-Attention 실행
    head_outputs = []  # 각 헤드의 출력을 저장할 리스트
    all_attention_weights = []  # 각 헤드의 attention weight

    # 8개 헤드 모두 실행
    for attention_head in self.attention_heads:
        # attention_head: SelfAttention 객체
        # 반환: (attention_output, attention_weights)
        # attention_output: (32, 20, 32) - 각 헤드는 32차원 출력
        head_out, att_weights = attention_head(query, key, value, mask)

        # 출력 저장
        head_outputs.append(head_out)
        all_attention_weights.append(att_weights)

    # 2. 모든 헤드의 출력 연결 (concatenate)
    # torch.cat(list, dim=-1): 마지막 차원에서 연결
    concat_output = torch.cat(head_outputs, dim=-1)

    # 3. 선형변환으로 최종 출력 생성
    # (32, 20, 256) 을 다시 (32, 20, 256)으로 변환하지만
    # 학습 가능한 가중치를 통해 헤드들의 정보를 종합
    output = self.linear_out(concat_output)

    return output, all_attention_weights

# **4. Feed-Forward Network**

## 각 토큰에 독립적으로 적용되는 완전 연결 네트워크
    
  #### **구조**:
  - Linear(d_model -> d_ff) -> ReLU -> Linear(d_ff -> d_model)
  
  #### **예시**:
  - 입력: (32, 20, 256) - 각 토큰의 256차원 벡터
  - 확장: (32, 20, 2048) - 더 복잡한 연산을 위해 차원 확장
  - ReLU: 음수를 0으로 만들어 비선형성 추가
  - 축소: (32, 20, 256) - 다시 원래 차원으로
  
  #### **목적**:
  - Attention에서는 관계를 학습
  - FFN에서는 관계를 바탕으로 깊이 있는 표현 생성

In [6]:
class FeedForwardNetwork(nn.Module):

  def __init__(self, d_model, d_ff=2048):
      super().__init__()  # 부모 클래스 초기화
      # 첫 번째 선형층: 차원 확장
      # d_model=256 → d_ff=2048로 확장
      # 예: (32, 20, 256) → (32, 20, 2048)
      self.fc1 = nn.Linear(d_model, d_ff)
      # 두 번째 선형층: 차원 축소
      # d_ff=2048 → d_model=256으로 축소
      # 예: (32, 20, 2048) → (32, 20, 256)
      self.fc2 = nn.Linear(d_ff, d_model)
      # ReLU 활성화 함수: max(0, x)
      # 음수는 0, 양수는 그대로
      # 비선형성을 추가해서 모델이 더 복잡한 함수 학습 가능
      self.relu = nn.ReLU()

  def forward(self, x):
    # 단계 1: 선형 변환으로 차원 확장
    # (32, 20, 256) → (32, 20, 2048)
    x = self.fc1(x)

    # 단계 2: ReLU 활성화
    # 모든 음수를 0으로 변환
    x = self.relu(x)

    # 단계 3: 선형 변환으로 차원 축소
    # (32, 20, 2048) → (32, 20, 256)
    x = self.fc2(x)

    return x


# **5. Transformer Encoder Layer**

## Transformer 인코더 레이어
    
  #### **구조**:
  1. Multi-Head Attention
  2. Add & Norm (잔여 연결 + 정규화)
  3. Feed-Forward Network
  4. Add & Norm
  
  #### **시각화**:
  ```
  입력 x
    ↓
  [Multi-Head Attention]
    ↓ (attention 출력)
  [Add & Norm]: x + attention_out, LayerNorm
    ↓
  [Feed-Forward]
    ↓ (ffn 출력)
  [Add & Norm]: 위_출력 + ffn_out, LayerNorm
    ↓
  출력
  ```

  #### **목적**:
  - **Attention**: 토큰들 간의 관계 학습
  - **FFN**: 각 토큰의 표현을 깊이 있게 변환
  - **잔여 연결**: 깊은 네트워크에서도 정보 손실 방지
  - **Layer Norm**: 훈련 안정화

In [7]:
class TransformerEncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads=8, d_ff=2048, dropout=0.1):
    super().__init__()

    # Multi-Head Attention 모듈
    # d_model=256, num_heads=8 → 각 헤드 32차원
    self.mha = MultiHeadAttention(d_model, num_heads)

    # Feed-Forward Network 모듈
    # 차원: d_model(256) → d_ff(2048) → d_model(256)
    self.ffn = FeedForwardNetwork(d_model, d_ff)

    # Layer Normalization: 평균을 0, 표준편차를 1로 정규화
    # 입력 d_model 차원에 대해 정규화
    # 예: (32, 20, 256)을 마지막 차원(256)에 대해 정규화
    self.norm1 = nn.LayerNorm(d_model)  # Attention 후 정규화
    self.norm2 = nn.LayerNorm(d_model)  # FFN 후 정규화

    # Dropout: 훈련 중 일부 뉴런을 무작위로 꺼서 과적합 방지
    # dropout=0.1 → 10% 확률로 끔
    self.dropout = nn.Dropout(dropout)

  def forward(self, x, mask=None):

    # 단계 1: Multi-Head Attention + 잔여 연결 + 정규화

    # Multi-Head Attention 실행
    # 입력 x를 query, key, value로 사용 (self-attention)
    # 반환: (attention_output, attention_weights)
    mha_output, _ = self.mha(x, x, x, mask)
    # mha_output: (32, 20, 256)
    # _: attention_weights는 여기서 사용 안 함

    # Dropout 적용
    # 훈련 중 10% 확률로 일부 값을 0으로 만듦
    mha_output = self.dropout(mha_output)

    # 잔여 연결 (Residual Connection)
    # x + mha_output: 원래 입력에 attention 출력 더하기
    # 이렇게 하면 정보가 손실되지 않음
    x = x + mha_output

    # Layer Normalization
    # 평균 0, 표준편차 1로 정규화
    # 모델 훈련이 더 안정적, 빨리 수렴
    x = self.norm1(x)

    # 단계 2: Feed-Forward Network + 잔여 연결 + 정규화

    # FFN 실행
    # 이전 단계의 출력을 입력으로 사용
    ffn_output = self.ffn(x)
    # ffn_output: (32, 20, 256)

    # Dropout 적용
    ffn_output = self.dropout(ffn_output)

    # 잔여 연결
    # x + ffn_output: 원래 x에 ffn 출력 더하기
    x = x + ffn_output

    # Layer Normalization
    x = self.norm2(x)

    # 최종 출력: (32, 20, 256)
    return x

# **6. Transformer Encoder**

## 여러 개의 Transformer 인코더 레이어를 쌓음
    
  #### **구조**:
  ```
  입력 시퀀스
      ↓
    [Embedding + Positional Encoding]
      ↓
    [인코더 레이어 1: Attention + FFN]
      ↓
    [인코더 레이어 2: Attention + FFN]
      ↓
    ...
      ↓
    [인코더 레이어 6: Attention + FFN]
      ↓
    출력 (각 토큰의 풍부한 표현)
    
    각 레이어마다 점점 더 복잡하고 추상적인 표현으로 변환됨
  ```
  

In [8]:
class TransformerEncoder(nn.Module):
  def __init__(self, d_model, num_layers=6, num_heads=8, d_ff=2048,
                 max_seq_len=512, dropout=0.1):
    super().__init__()  # 부모 클래스 초기화

    # 위치 인코딩 모듈
    # 시퀀스의 위치 정보를 추가
    # max_seq_len=512: 최대 512 길이 시퀀스까지 지원
    self.positional_encoding = PositionalEncoding(d_model, max_seq_len)

    # num_layers개의 인코더 레이어 생성
    # nn.ModuleList: 여러 모듈을 리스트로 관리
    # 예: 6개의 TransformerEncoderLayer

# **미니 실험: 문장 감정 분류**

### 실험 모델 조립 (Transformer Classifier)

In [9]:
import torch
import torch.nn as nn

class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, num_classes, max_len, dropout=0.1):
        super().__init__()
        # 1. 임베딩 및 위치 인코딩
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        # 2. Transformer Encoder Layers를 리스트로 쌓음
        self.layers = nn.ModuleList([
            TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        # 3. 분류를 위한 최종 출력층
        # 전체 문장의 특징을 하나로 합치기 위해 평균(Mean Pooling)을 사용하거나
        # 첫 번째 토큰([CLS])을 사용합니다. 여기서는 평균을 사용합니다.
        self.fc = nn.Linear(d_model, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # x: (batch_size, seq_len)
        out = self.embedding(x) # (batch_size, seq_len, d_model)
        out = self.pos_encoding(out)

        # 여러 개의 Encoder Layer를 통과
        for layer in self.layers:
            out = layer(out, mask)

        # Global Average Pooling: 문장 전체 토큰의 벡터를 평균내어 문장 벡터 생성
        # (batch_size, seq_len, d_model) -> (batch_size, d_model)
        out = out.mean(dim=1)

        out = self.dropout(out)
        return self.fc(out) # (batch_size, num_classes)

### 영화 리뷰 감정 분류

In [10]:
# 샘플 데이터셋
data = [
    ("i love this movie", 1),
    ("this film is great", 1),
    ("i hate this movie", 0),
    ("this film is terrible", 0)
]

# 단어 사전 구축 및 정수 인코딩
word_list = " ".join([d[0] for d in data]).split()
vocab = {"<PAD>": 0}
for word in set(word_list):
    vocab[word] = len(vocab)

vocab_size = len(vocab)
max_len = 5 # 모든 문장의 길이를 5로 맞춤

def encode(text):
    encoded = [vocab[w] for w in text.split()]
    # 길이를 max_len에 맞게 패딩(0)
    return encoded + [0] * (max_len - len(encoded))

# 학습 데이터 텐서 변환
input_batch = torch.tensor([encode(d[0]) for d in data]) # (4, 5)
target_batch = torch.tensor([d[1] for d in data])        # (4)

In [13]:
# 모델 초기화
model = TransformerClassifier(vocab_size, d_model=128, num_heads=4, d_ff=512,
                              num_layers=2, num_classes=2, max_len=max_len)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 학습 시작
model.train()
for epoch in range(100):
    optimizer.zero_grad()
    outputs = model(input_batch)
    loss = criterion(outputs, target_batch)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f"Epoch [{epoch+1}/100], Loss: {loss.item():.4f}")

# 결과 해석
model.eval()
with torch.no_grad():
    test_text = "i love film" # 학습에 없던 조합
    test_input = torch.tensor([encode(test_text)])
    prediction = model(test_input)
    pred_class = torch.argmax(prediction, dim=1).item()

    sentiment = "긍정" if pred_class == 1 else "부정"
    print(f"\n테스트 문장: '{test_text}'")
    print(f"모델의 예측 결과: {sentiment}")

Epoch [20/100], Loss: 0.0014
Epoch [40/100], Loss: 0.0004
Epoch [60/100], Loss: 0.0002
Epoch [80/100], Loss: 0.0002
Epoch [100/100], Loss: 0.0001

테스트 문장: 'i love film'
모델의 예측 결과: 긍정
