# Chapter 07 실습 1: Transformer 텍스트 분류

## 실습 목표
- Positional Encoding 클래스를 직접 구현한다
- TransformerBlock 클래스(MultiHeadAttention + FFN)를 구현한다
- TextVectorization으로 텍스트 전처리 파이프라인을 구성한다
- IMDB 데이터에 Transformer 분류 모델을 적용하고 평가한다
- BiLSTM 모델과 성능을 비교한다 (도전 과제)

## 실습 개요

```
입력 텍스트
    ↓ TextVectorization
정수 시퀀스
    ↓ Embedding + Positional Encoding
임베딩 + 위치 정보
    ↓ TransformerBlock × 2
컨텍스트 벡터
    ↓ GlobalAveragePooling1D
고정 크기 벡터
    ↓ Dense(64, relu) → Dense(1, sigmoid)
감성 예측 (긍정/부정)
```

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import matplotlib

# 한글 폰트 설정 (macOS)
matplotlib.rcParams['font.family'] = 'AppleGothic'
matplotlib.rcParams['axes.unicode_minus'] = False

print(f'TensorFlow 버전: {tf.__version__}')

# 재현성 시드
tf.random.set_seed(42)
np.random.seed(42)

# 하이퍼파라미터 설정
VOCAB_SIZE    = 20000   # 어휘 크기
MAX_LEN       = 200     # 최대 시퀀스 길이
EMBED_DIM     = 64      # 임베딩 차원 (= d_model)
NUM_HEADS     = 4       # Multi-Head Attention 헤드 수
FF_DIM        = 256     # Feed-Forward 내부 차원 (= d_ff)
NUM_BLOCKS    = 2       # Transformer Block 수
DROPOUT_RATE  = 0.1     # Dropout 비율
BATCH_SIZE    = 64      # 배치 크기
EPOCHS        = 10      # 학습 에포크 수
LEARNING_RATE = 1e-4    # 학습률

print('하이퍼파라미터 설정 완료')
print(f'  VOCAB_SIZE:   {VOCAB_SIZE}')
print(f'  MAX_LEN:      {MAX_LEN}')
print(f'  EMBED_DIM:    {EMBED_DIM}')
print(f'  NUM_HEADS:    {NUM_HEADS}')
print(f'  FF_DIM:       {FF_DIM}')
print(f'  NUM_BLOCKS:   {NUM_BLOCKS}')

In [None]:
class PositionalEncoding(keras.layers.Layer):
    """
    Positional Encoding 레이어
    토큰 임베딩에 위치 정보를 더함
    
    PE(pos, 2i)   = sin(pos / 10000^(2i/d_model))
    PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
    """
    def __init__(self, max_seq_len, d_model, **kwargs):
        super().__init__(**kwargs)
        self.max_seq_len = max_seq_len
        self.d_model     = d_model
        
        # 토큰 임베딩 레이어
        self.embedding = keras.layers.Embedding(
            input_dim=VOCAB_SIZE + 1,  # +1: 패딩 인덱스 0 포함
            output_dim=d_model,
            mask_zero=True             # 패딩 토큰 마스킹 활성화
        )
        
        # Positional Encoding 행렬 사전 계산
        self.pos_encoding = self._create_positional_encoding(max_seq_len, d_model)
    
    def _create_positional_encoding(self, max_seq_len, d_model):
        """sin/cos 기반 Positional Encoding 계산"""
        # 위치 벡터: shape = (max_seq_len, 1)
        positions = np.arange(max_seq_len, dtype=np.float32)[:, np.newaxis]
        
        # 차원 인덱스 (짝수만): shape = (1, d_model // 2)
        dims = np.arange(0, d_model, 2, dtype=np.float32)[np.newaxis, :]
        
        # 주파수 분모: 10000^(2i/d_model)
        div_term = np.power(10000.0, dims / d_model)
        
        # PE 행렬 초기화
        pe = np.zeros((max_seq_len, d_model), dtype=np.float32)
        pe[:, 0::2] = np.sin(positions / div_term)  # 짝수 차원: sin
        pe[:, 1::2] = np.cos(positions / div_term)  # 홀수 차원: cos
        
        # 배치 차원 추가: (1, max_seq_len, d_model)
        return tf.constant(pe[np.newaxis, :, :], dtype=tf.float32)
    
    def call(self, x):
        seq_len = tf.shape(x)[1]
        
        # 토큰 임베딩: (batch, seq_len) → (batch, seq_len, d_model)
        x_embed = self.embedding(x)
        
        # 임베딩 스케일링: sqrt(d_model)을 곱해 PE와 크기를 맞춤
        x_embed *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        
        # Positional Encoding 더하기 (브로드캐스팅)
        x_embed += self.pos_encoding[:, :seq_len, :]
        
        return x_embed
    
    def get_config(self):
        config = super().get_config()
        config.update({'max_seq_len': self.max_seq_len, 'd_model': self.d_model})
        return config


# 동작 확인
pe_layer = PositionalEncoding(MAX_LEN, EMBED_DIM)
dummy_tokens = tf.ones((2, 10), dtype=tf.int32)  # (batch=2, seq=10)
pe_output = pe_layer(dummy_tokens)
print('PositionalEncoding 입력 shape:', dummy_tokens.shape)
print('PositionalEncoding 출력 shape:', pe_output.shape)  # (2, 10, 64)

In [None]:
class TransformerBlock(keras.layers.Layer):
    """
    Transformer Encoder Block
    Multi-Head Self-Attention + Add&Norm + FFN + Add&Norm
    
    구조:
        x → MHA(Q=K=V=x) → Add → LayerNorm → FFN → Add → LayerNorm → 출력
    """
    def __init__(self, embed_dim, num_heads, ff_dim, dropout_rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim   = embed_dim
        self.num_heads   = num_heads
        self.ff_dim      = ff_dim
        self.dropout_rate = dropout_rate
        
        # --- Multi-Head Self-Attention ---
        self.mha = keras.layers.MultiHeadAttention(
            num_heads=num_heads,
            key_dim=embed_dim // num_heads,  # 헤드당 차원
            dropout=dropout_rate
        )
        
        # --- Feed-Forward Network ---
        # FFN(x) = max(0, xW1 + b1)W2 + b2
        self.ffn = keras.Sequential([
            keras.layers.Dense(ff_dim, activation='relu'),  # 확장: embed_dim → ff_dim
            keras.layers.Dense(embed_dim)                   # 축소: ff_dim → embed_dim
        ], name='ffn')
        
        # --- Layer Normalization (Pre-LN 방식) ---
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        
        # --- Dropout ---
        self.dropout1 = keras.layers.Dropout(dropout_rate)
        self.dropout2 = keras.layers.Dropout(dropout_rate)
    
    def call(self, x, training=False):
        # --- Sub-Layer 1: Multi-Head Self-Attention ---
        # Self-Attention: Q = K = V = x
        attn_output = self.mha(
            query=x, key=x, value=x,
            training=training
        )
        attn_output = self.dropout1(attn_output, training=training)
        # 잔차 연결(Residual) + 레이어 정규화
        out1 = self.layernorm1(x + attn_output)
        
        # --- Sub-Layer 2: Position-wise Feed-Forward Network ---
        ffn_output = self.ffn(out1, training=training)
        ffn_output = self.dropout2(ffn_output, training=training)
        # 잔차 연결 + 레이어 정규화
        out2 = self.layernorm2(out1 + ffn_output)
        
        return out2  # (batch, seq_len, embed_dim)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            'embed_dim':    self.embed_dim,
            'num_heads':    self.num_heads,
            'ff_dim':       self.ff_dim,
            'dropout_rate': self.dropout_rate
        })
        return config


# 동작 확인
tf_block = TransformerBlock(EMBED_DIM, NUM_HEADS, FF_DIM, DROPOUT_RATE)
dummy_embed = tf.random.normal((2, 10, EMBED_DIM))  # (batch, seq, embed)
block_out = tf_block(dummy_embed, training=False)
print('TransformerBlock 입력 shape:', dummy_embed.shape)
print('TransformerBlock 출력 shape:', block_out.shape)  # (2, 10, 64)
print('입출력 차원 동일:', dummy_embed.shape == block_out.shape)

In [None]:
# IMDB 데이터 로드
print('IMDB 데이터 로딩...')
(x_train_raw, y_train), (x_test_raw, y_test) = keras.datasets.imdb.load_data(
    num_words=VOCAB_SIZE
)

# 패딩 처리
x_train_padded = keras.preprocessing.sequence.pad_sequences(
    x_train_raw, maxlen=MAX_LEN, padding='post', truncating='post'
)
x_test_padded = keras.preprocessing.sequence.pad_sequences(
    x_test_raw, maxlen=MAX_LEN, padding='post', truncating='post'
)

print(f'훈련 데이터: {x_train_padded.shape}')
print(f'테스트 데이터: {x_test_padded.shape}')
print(f'레이블 분포 (훈련): 긍정={y_train.sum()}, 부정={len(y_train)-y_train.sum()}')

# TextVectorization을 사용한 추가 전처리 예시
# (이미 정수 인코딩된 IMDB 데이터와 병행하여 사용 가능)
print('\n--- TextVectorization 사용 예시 ---')
# 실제 텍스트 데이터가 있을 경우 사용 방법:
sample_texts = [
    "This movie was absolutely fantastic!",
    "I really enjoyed the plot and characters.",
    "Terrible film, complete waste of time."
]

# TextVectorization 레이어 설정
vectorizer = keras.layers.TextVectorization(
    max_tokens=VOCAB_SIZE,           # 최대 어휘 크기
    output_mode='int',               # 정수 인코딩
    output_sequence_length=MAX_LEN   # 고정 길이 출력
)

# 텍스트 어휘 학습 (adapt)
vectorizer.adapt(sample_texts)

# 변환 예시
vectorized = vectorizer(sample_texts[:1])
print(f'입력 텍스트: "{sample_texts[0]}"')
print(f'벡터화 결과: {vectorized[0].numpy()[:10]}...')
print(f'어휘 크기: {len(vectorizer.get_vocabulary())}')

In [None]:
def build_transformer_classifier():
    """
    전체 Transformer 텍스트 분류 모델 구성:
    Input → Embedding + PE → TransformerBlock × 2 → GlobalAvgPool → Dense → 출력
    """
    # 입력 레이어
    inputs = keras.Input(shape=(MAX_LEN,), dtype=tf.int32, name='token_ids')
    
    # --- Stage 1: Positional Embedding ---
    x = PositionalEncoding(
        max_seq_len=MAX_LEN,
        d_model=EMBED_DIM,
        name='positional_encoding'
    )(inputs)
    x = keras.layers.Dropout(DROPOUT_RATE)(x)
    
    # --- Stage 2: Transformer Blocks × NUM_BLOCKS ---
    for i in range(NUM_BLOCKS):
        x = TransformerBlock(
            embed_dim=EMBED_DIM,
            num_heads=NUM_HEADS,
            ff_dim=FF_DIM,
            dropout_rate=DROPOUT_RATE,
            name=f'transformer_block_{i+1}'
        )(x)
    
    # --- Stage 3: 시퀀스 → 벡터 변환 ---
    # GlobalAveragePooling1D: 시퀀스 차원에 대해 평균
    x = keras.layers.GlobalAveragePooling1D(name='global_avg_pool')(x)
    
    # --- Stage 4: 분류 헤드 ---
    x = keras.layers.Dense(64, activation='relu', name='dense_hidden')(x)
    x = keras.layers.Dropout(DROPOUT_RATE)(x)
    outputs = keras.layers.Dense(1, activation='sigmoid', name='output')(x)
    
    model = keras.Model(inputs=inputs, outputs=outputs, name='TransformerClassifier')
    return model


# 모델 생성
model = build_transformer_classifier()

# 컴파일
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

model.summary()

# 파라미터 수 계산
total_params = model.count_params()
print(f'\n총 파라미터 수: {total_params:,}')
print(f'  = 약 {total_params/1e6:.2f}M')

In [None]:
# 학습
print('Transformer 분류 모델 학습 시작...')

callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=3,
        restore_best_weights=True,
        verbose=1
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=2,
        verbose=1
    )
]

history = model.fit(
    x_train_padded, y_train,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1
)

# 테스트 평가
test_loss, test_acc = model.evaluate(x_test_padded, y_test, verbose=0)
print(f'\n테스트 정확도: {test_acc:.4f} ({test_acc*100:.2f}%)')
print(f'테스트 손실:   {test_loss:.4f}')

# 학습 곡선 시각화
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 정확도
ax1 = axes[0]
ax1.plot(history.history['accuracy'], 'b-o', label='훈련 정확도', linewidth=2)
ax1.plot(history.history['val_accuracy'], 'r-s', label='검증 정확도', linewidth=2)
ax1.set_title(f'정확도\n(최종 테스트: {test_acc:.4f})', fontsize=12)
ax1.set_xlabel('에포크')
ax1.set_ylabel('정확도')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_ylim([0.5, 1.0])

# 손실
ax2 = axes[1]
ax2.plot(history.history['loss'], 'b-o', label='훈련 손실', linewidth=2)
ax2.plot(history.history['val_loss'], 'r-s', label='검증 손실', linewidth=2)
ax2.set_title(f'손실\n(최종 테스트: {test_loss:.4f})', fontsize=12)
ax2.set_xlabel('에포크')
ax2.set_ylabel('손실')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.suptitle('Transformer 텍스트 분류 학습 결과', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# 예측 예시
print('\n=== 예측 예시 ===')
n_examples = 5
preds = model.predict(x_test_padded[:n_examples], verbose=0)
for i, (pred, true_label) in enumerate(zip(preds, y_test[:n_examples])):
    pred_label = '긍정' if pred[0] > 0.5 else '부정'
    true_str   = '긍정' if true_label == 1 else '부정'
    correct    = '✓' if pred_label == true_str else '✗'
    print(f'샘플 {i+1}: 예측={pred_label} ({pred[0]:.3f}) | 실제={true_str} {correct}')

## 도전 과제: BiLSTM과 성능 비교

아래 BiLSTM 모델을 구현하고 Transformer 모델과 성능을 비교해 보자.

### BiLSTM 모델 구조 (참고)

```python
def build_bilstm_classifier():
    inputs = keras.Input(shape=(MAX_LEN,), dtype=tf.int32)
    x = keras.layers.Embedding(VOCAB_SIZE + 1, EMBED_DIM, mask_zero=True)(inputs)
    x = keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True))(x)
    x = keras.layers.Bidirectional(keras.layers.LSTM(32))(x)
    x = keras.layers.Dense(64, activation='relu')(x)
    x = keras.layers.Dropout(0.3)(x)
    outputs = keras.layers.Dense(1, activation='sigmoid')(x)
    return keras.Model(inputs, outputs, name='BiLSTM_Classifier')
```

### 비교 항목

| 항목 | Transformer | BiLSTM |
|------|------------|--------|
| 테스트 정확도 | ? | ? |
| 학습 시간/에포크 | ? | ? |
| 파라미터 수 | ? | ? |
| 수렴 속도 | ? | ? |

### 추가 탐구 아이디어

1. `NUM_BLOCKS`를 1, 2, 4로 변경하여 성능 변화 관찰
2. `NUM_HEADS`를 2, 4, 8로 변경하여 성능 변화 관찰
3. `GlobalAveragePooling1D` 대신 `[CLS]` 토큰 방식 구현
4. 학습률 스케줄러(Warm-up + Decay) 적용
5. 데이터 증강: 동의어 대체(Synonym Replacement) 적용