# Step 1. 데이터 수집하기

In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Step 2. 데이터 전처리하기

In [2]:
data = pd.read_csv('data/ChatbotData.csv')

# 데이터 확인
print(data.head())
print()
print(data.info())
print()
print(data['label'].value_counts())
print(f"\n결측치 확인:\n{data.isnull().sum()}\n")

                 Q            A  label
0           12시 땡!   하루가 또 가네요.      0
1      1지망 학교 떨어졌어    위로해 드립니다.      0
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠.      0
4          PPL 심하네   눈살이 찌푸려지죠.      0

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
 2   label   11823 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 277.2+ KB
None

0    5290
1    3570
2    2963
Name: label, dtype: int64

결측치 확인:
Q        0
A        0
label    0
dtype: int64



In [3]:
# 텍스트 정제
def clean_text(text):
    text = text.strip()
    text = re.sub(r"[^가-힣a-zA-Z0-9?.!,]+", " ", text) # 특수문자 일부 제외하고 제거
    return text

# 'Q', 'A' 컬럼에 적용
data['Q'] = data['Q'].apply(clean_text)
data['A'] = data['A'].apply(clean_text)

# Step 3. SubwordTextEncoder 사용하기

## 토크나이저 학습용 코퍼스 생성 (질문 + 답변)

In [4]:
corpus = data['Q'].tolist() + data['A'].tolist()
print(f"\n코퍼스 크기 (문장 수): {len(corpus)}")


코퍼스 크기 (문장 수): 23646


## SubwordTextEncoder 구축

In [5]:
# target_vocab_size를 적절히 설정 (데이터셋 크기에 따라 조절)
# 너무 작으면 UNK 토큰이 많아지고, 너무 크면 희소해질 수 있음
target_vocab_size = 10000 # 예시 크기, 필요에 따라 조절

print(f"\nSubwordTextEncoder 구축 시작 (target_vocab_size={target_vocab_size})...")
try:
    # tfds.deprecated.text 사용
    tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
        corpus, target_vocab_size=target_vocab_size
    )
    print("SubwordTextEncoder 구축 완료.")
    print(f"단어 사전 크기 (실제): {tokenizer.vocab_size}") # 목표 크기와 약간 다를 수 있음
except AttributeError:
    print("\nError: tfds.deprecated.text.SubwordTextEncoder 를 찾을 수 없습니다.")
    print("tensorflow_datasets 버전이 너무 낮거나 높아서 경로가 변경되었을 수 있습니다.")
    print("최신 버전에서는 tensorflow_text 또는 Hugging Face Tokenizers 사용을 권장합니다.")
    exit()
except Exception as e:
    print(f"\nSubwordTextEncoder 구축 중 오류 발생: {e}")
    exit()


SubwordTextEncoder 구축 시작 (target_vocab_size=10000)...
SubwordTextEncoder 구축 완료.
단어 사전 크기 (실제): 10100


## 데이터 인코딩 및 디코딩 확인

In [6]:
sample_index = 0
sample_q = data['Q'][sample_index]
sample_a = data['A'][sample_index]

# 인코딩
encoded_q = tokenizer.encode(sample_q)
encoded_a = tokenizer.encode(sample_a)

print(f"\n--- 샘플 인코딩 (index={sample_index}) ---")
print(f"원본 질문 (Q): {sample_q}")
print(f"인코딩된 Q: {encoded_q}")
print(f"원본 답변 (A): {sample_a}")
print(f"인코딩된 A: {encoded_a}")

# 디코딩
decoded_q = tokenizer.decode(encoded_q)
decoded_a = tokenizer.decode(encoded_a)

print(f"\n--- 샘플 디코딩 ---")
print(f"디코딩된 Q: {decoded_q}")
print(f"디코딩된 A: {decoded_a}")


# 서브워드 확인 (어떻게 분리되었는지)
subwords_q = [tokenizer.decode([token]) for token in encoded_q]
subwords_a = [tokenizer.decode([token]) for token in encoded_a]
print(f"\n--- 서브워드 분리 결과 ---")
print(f"Q 서브워드: {' | '.join(subwords_q)}")
print(f"A 서브워드: {' | '.join(subwords_a)}")


--- 샘플 인코딩 (index=0) ---
원본 질문 (Q): 12시 땡!
인코딩된 Q: [6913, 3002, 4768, 9877]
원본 답변 (A): 하루가 또 가네요.
인코딩된 A: [3326, 66, 6893, 9890]

--- 샘플 디코딩 ---
디코딩된 Q: 12시 땡!
디코딩된 A: 하루가 또 가네요.

--- 서브워드 분리 결과 ---
Q 서브워드: 12 | 시  | 땡 | !
A 서브워드: 하루가  | 또  | 가네요 | .


# Step 4. 모델 구성하기

## 포지셔널 인코딩
- 순서 정보를 내재적으로 처리하지 못하는 트랜스포머 구조 특성상, 단어의 상대적 또는 절대적 위치를 모델에 알려주기 위해 임베딩 벡터에 포지셔널 인코딩 값을 더함

In [7]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    # position : 최대 시퀀스 길이
    # d_model : 임베딩 차원
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                           np.arange(d_model)[np.newaxis, :],
                           d_model)
    
    # 배열의 짝수 인덱스에는 사인 함수 적용
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    # 배열의 홀수 인덱스에는 코사인 함수 적용
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    # 배치 차원 추가
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

## 마스크 생성
- 어텐션 매커니즘이 불필요한 부분에 집중하지 않도록 마스크 사용

In [8]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask # (seq_len, seq_len)

## 스케일드 닷 프로덕트 어텐션
- 트랜스포머 어텐션 핵심 계산 메커니즘
- Q가 K와 유사도 내적을 계산하고, 이를 K의 차원 수 제곱근으로 나누어 스케일링
- 이후 마스크 적용하고 Softmax로 어텐션 가중치를 구한 뒤, 가중치를 V에 곱하여 최종 어텐션 값 얻음

In [9]:
def scaled_dot_product_attention(q, k, v, mask):
    # Query와 Key의 내적 계산
    matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k)

    # 스케일링 (dk: Key의 마지막 차원 크기)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # 마스크 적용 (마스크된 위치는 매우 작은 값으로 설정)
    if mask is not None:
        # mask 값이 1인 위치에 -1e9를 더해줌 (softmax 후 0에 가깝게 만듦)
        scaled_attention_logits += (mask * -1e9)

    # Softmax로 어텐션 가중치 계산 (마지막 축 기준)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # (..., seq_len_q, seq_len_k)

    # Value와 어텐션 가중치 곱셈
    output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v)
    return output, attention_weights

## 멀티 헤드 어텐션
- 한 번의 어텐션 대신 Q, K, V를 여러 개의 헤드로 나누어 각각 다른 관점에서 스케일드 닷 프로덕트 어텐션을 병렬로 수행
- 각 헤드의 결과를 concat하고 마지막으로 Dense를 통과시켜 최종 어텐션 결과 얻음
- 이를 통해 모델이 다양한 측면의 정보에 동시에 집중할 수 있게함

In [10]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % self.num_heads == 0 # d_model은 헤드 수로 나누어 떨어져야 함
        self.depth = d_model // self.num_heads # 각 헤드의 차원

        # Q, K, V 및 최종 출력을 위한 Dense 레이어
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        # (batch_size, seq_len, d_model) -> (batch_size, num_heads, seq_len, depth)
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3]) # 헤드 차원을 앞으로

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        # 1. Q, K, V를 각각의 Dense 레이어 통과 (선형 변환)
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        # 2. 헤드 분할
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        # 3. 스케일드 닷 프로덕트 어텐션 수행 (병렬)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)
        # scaled_attention: (batch_size, num_heads, seq_len_q, depth)

        # 4. 헤드 연결 (Concatenate)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth)
        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model)

        # 5. 최종 Dense 레이어 통과
        output = self.dense(concat_attention)
        return output, attention_weights

## 포인트 와이즈 피드 포워드 신경망
- 각 위치마다 독립적으로 적용되는 간단한 완전 연결 신경망
- 첫 번째 레이어는 활성화 함수 포함, 차원 확장했다가 두번째 레이어에서 다시 원래 차원으로 축소

In [11]:
def point_wise_feed_forward_network(d_model, dff):
    # d_model: 모델의 기본 차원
    # dff: Feed Forward Network 내부 레이어의 차원 (보통 d_model * 4)
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'), # 확장
        tf.keras.layers.Dense(d_model)                 # 축소
    ])

## 인코더 레이어
- 첫 번째 서브 레이어: 멀티 헤드 어텐션 (Self-Attention, 즉 Q, K, V가 모두 이전 층의 출력) 수행 후 Add & Norm (잔차 연결 + 레이어 정규화).
- 두 번째 서브 레이어: 포인트 와이즈 피드 포워드 신경망 수행 후 Add & Norm. Dropout은 각 서브 레이어의 출력에 적용되어 과적합을 방지

In [12]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        # 레이어 정규화
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        # 드롭아웃
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask): # mask: 패딩 마스크
        # 1. Multi-Head Self-Attention -> Dropout -> Add & Norm
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output) # 입력 x와 어텐션 출력을 더함 (Residual Connection)

        # 2. Feed Forward Network -> Dropout -> Add & Norm
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output) # 이전 결과 out1과 FFN 출력을 더함

        return out2

## 디코더 레이어
- 첫 번째 서브 레이어: Masked 멀티 헤드 어텐션 (Self-Attention, Look-ahead 마스크 사용) 수행 후 Add & Norm. 디코더가 이전에 생성한 단어들만 참조
- 두 번째 서브 레이어: 멀티 헤드 어텐션 (Encoder-Decoder Attention). Query는 이전 서브 레이어의 출력, Key와 Value는 인코더의 최종 출력을 사용합니다. 인코더 출력 중 어떤 부분에 집중할지 결정. 이후 Add & Norm.
- 세 번째 서브 레이어: 포인트 와이즈 피드 포워드 신경망 수행 후 Add & Norm.

In [13]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()
        # Masked Self-Attention (첫 번째 MHA)
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        # Encoder-Decoder Attention (두 번째 MHA)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # x: 디코더 입력 (이전 레이어 출력)
        # enc_output: 인코더 최종 출력
        # look_ahead_mask: 디코더 첫번째 MHA용 마스크 (Self-Attention Mask)
        # padding_mask: 디코더 두번째 MHA용 마스크 (Encoder Output Padding Mask)

        # 1. Masked Multi-Head Self-Attention -> Dropout -> Add & Norm
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) # Q, K, V 모두 x
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # 2. Encoder-Decoder Attention -> Dropout -> Add & Norm
        # Query: out1 (이전 결과), Key/Value: enc_output (인코더 출력)
        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1) # 이전 결과 out1과 두번째 어텐션 결과 더함

        # 3. Feed Forward Network -> Dropout -> Add & Norm
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2) # 이전 결과 out2와 FFN 출력 더함

        return out3, attn_weights_block1, attn_weights_block2

## 인코더
- 입력 시퀀스를 받아 임베딩과 포지셔널 인코딩을 적용한 후, 여러 개의 EncoderLayer를 순차적으로 통과시켜 입력 시퀀스의 최종 표현(representation) 벡터 생성

In [14]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers # EncoderLayer 반복 횟수

        # 임베딩 레이어
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        # 포지셔널 인코딩 값 (미리 계산)
        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)

        # num_layers 만큼 EncoderLayer 생성
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask): # x: 인코더 입력 시퀀스, mask: 패딩 마스크
        seq_len = tf.shape(x)[1]

        # 1. 임베딩 + 포지셔널 인코딩
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) # 임베딩 값 스케일링 (논문 방식)
        x += self.pos_encoding[:, :seq_len, :] # 포지셔널 인코딩 더하기

        x = self.dropout(x, training=training)

        # 2. EncoderLayer 스택 통과
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x # 최종 인코더 출력 (batch_size, input_seq_len, d_model)

## 디코더
- 타겟 시퀀스(보통 <SOS> 토큰으로 시작)를 받아 임베딩과 포지셔널 인코딩을 적용한 후, 여러 개의 DecoderLayer를 순차적으로 통과
- 각 DecoderLayer는 인코더의 최종 출력(enc_output)을 참조하여 다음 단어를 예측하는 데 필요한 정보를 생성

In [15]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # x: 디코더 입력 시퀀스
        # enc_output: 인코더 최종 출력
        # look_ahead_mask: 디코더 첫번째 MHA용 마스크
        # padding_mask: 디코더 두번째 MHA용 마스크
        seq_len = tf.shape(x)[1]
        attention_weights = {} # 각 레이어의 어텐션 가중치 저장 (시각화 등 활용)

        # 1. 임베딩 + 포지셔널 인코딩
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        # 2. DecoderLayer 스택 통과
        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                   look_ahead_mask, padding_mask)
            # 어텐션 가중치 저장
            attention_weights[f'decoder_layer{i+1}_block1'] = block1 # Masked Self-Attention
            attention_weights[f'decoder_layer{i+1}_block2'] = block2 # Encoder-Decoder Attention

        # 최종 디코더 출력 (batch_size, target_seq_len, d_model)
        return x, attention_weights

## 트랜스포머 모델
- 최종 트랜스포머 모델 클래스

In [16]:
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()
        # 인코더 초기화
        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                               input_vocab_size, pe_input, rate)
        # 디코더 초기화
        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               target_vocab_size, pe_target, rate)
        # 최종 출력 레이어 (단어 사전 크기로 매핑)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        # inp: 인코더 입력 (질문)
        # tar: 디코더 입력 (답변, <SOS> 시작)
        # training: 학습 여부 (True/False)
        # enc_padding_mask: 인코더 패딩 마스크
        # look_ahead_mask: 디코더 첫번째 MHA용 마스크
        # dec_padding_mask: 디코더 두번째 MHA용 마스크

        # 1. 인코더 통과
        enc_output = self.encoder(inp, training, enc_padding_mask) # (batch_size, inp_seq_len, d_model)

        # 2. 디코더 통과
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask) # (batch_size, tar_seq_len, d_model)

        # 3. 최종 선형 레이어 통과
        final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights # 최종 로짓과 어텐션 가중치 반환

## 학습

In [17]:
# 1. 데이터 준비 (인코딩, 토큰 추가, 패딩)

# 상수 정의 (★ 중요: 모델 생성 및 평가 시 동일하게 사용 ★)
MAX_LENGTH = 40      # 예시 최대 길이, 데이터 분포 및 리소스 고려하여 결정
BUFFER_SIZE = 20000  # 데이터 셔플링 버퍼 크기
BATCH_SIZE = 64      # 배치 크기

# SOS/EOS 토큰 정의
START_TOKEN = tokenizer.vocab_size
END_TOKEN = tokenizer.vocab_size + 1
print(f"START_TOKEN: {START_TOKEN}, END_TOKEN: {END_TOKEN}")
print(f"MAX_LENGTH: {MAX_LENGTH}, BATCH_SIZE: {BATCH_SIZE}")

# 전체 데이터 인코딩 및 SOS/EOS 추가
print("전체 데이터 인코딩 및 토큰 추가 중...")
try:
    all_questions = [tokenizer.encode(q) for q in data['Q']]
    all_answers = [tokenizer.encode(a) for a in data['A']]
    print("인코딩 완료.")

    # 입력/출력 시퀀스 생성 및 패딩
    encoder_input = tf.keras.preprocessing.sequence.pad_sequences(
        all_questions, maxlen=MAX_LENGTH, padding='post'
    )
    decoder_input = tf.keras.preprocessing.sequence.pad_sequences(
        [[START_TOKEN] + a for a in all_answers], maxlen=MAX_LENGTH, padding='post'
    )
    decoder_target = tf.keras.preprocessing.sequence.pad_sequences(
        [a + [END_TOKEN] for a in all_answers], maxlen=MAX_LENGTH, padding='post'
    )

    print(f"\n패딩된 데이터 Shape:")
    print(f"Encoder Input: {encoder_input.shape}")
    print(f"Decoder Input: {decoder_input.shape}")
    print(f"Decoder Target: {decoder_target.shape}")

except Exception as e:
    print(f"\n데이터 인코딩 또는 패딩 중 오류 발생: {e}")
    raise e

START_TOKEN: 10100, END_TOKEN: 10101
MAX_LENGTH: 40, BATCH_SIZE: 64
전체 데이터 인코딩 및 토큰 추가 중...
인코딩 완료.

패딩된 데이터 Shape:
Encoder Input: (11823, 40)
Decoder Input: (11823, 40)
Decoder Target: (11823, 40)


In [18]:
# 2. 데이터셋 분할 및 tf.data.Dataset 생성

from sklearn.model_selection import train_test_split

# 데이터를 훈련 세트와 검증 세트로 분할 (예: 90% 훈련, 10% 검증)
enc_train, enc_val, dec_in_train, dec_in_val, dec_out_train, dec_out_val = train_test_split(
    encoder_input, decoder_input, decoder_target, test_size=0.1, random_state=42
)

print(f"\n훈련 데이터셋 크기: {len(enc_train)}")
print(f"검증 데이터셋 크기: {len(enc_val)}")

# tf.data.Dataset 객체 생성 (학습 효율성 향상)
train_dataset = tf.data.Dataset.from_tensor_slices((
    {'inputs': enc_train, 'dec_inputs': dec_in_train}, # 모델 입력은 딕셔너리 형태 권장
    {'outputs': dec_out_train}                         # 모델 출력(타겟)도 딕셔너리 형태 권장
))
train_dataset = train_dataset.cache() # 데이터를 메모리에 캐싱하여 속도 향상 (메모리 부족 시 제거)
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE) # 학습 중 다음 배치를 미리 로드

val_dataset = tf.data.Dataset.from_tensor_slices((
    {'inputs': enc_val, 'dec_inputs': dec_in_val},
    {'outputs': dec_out_val}
))
val_dataset = val_dataset.batch(BATCH_SIZE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)

print("\n훈련 및 검증용 tf.data.Dataset 생성 완료.")
# print(f"훈련 데이터셋 샘플 (첫 배치): {next(iter(train_dataset))}") # 데이터 형태 확인용 (선택적)


훈련 데이터셋 크기: 10640
검증 데이터셋 크기: 1183

훈련 및 검증용 tf.data.Dataset 생성 완료.


In [19]:
# 3. 모델 인스턴스 생성

# 하이퍼파라미터 설정 (★ 성능 위해 튜닝 필요 ★)
NUM_LAYERS = 2       # 인코더/디코더 레이어 수 (낮으면 학습 빠름, 높으면 성능 기대)
D_MODEL = 256        # 임베딩 및 모델 내부 벡터 차원
NUM_HEADS = 8        # 멀티 헤드 어텐션 헤드 수 (D_MODEL의 약수여야 함)
DFF = 512            # Feed Forward Network 내부 차원
DROPOUT_RATE = 0.1   # 드롭아웃 비율

# 어휘 사전 크기 (START_TOKEN, END_TOKEN 포함)
INPUT_VOCAB_SIZE = target_vocab_size = START_TOKEN + 2 # tokenizer.vocab_size + 2

print("\nTransformer 모델 인스턴스 생성 중...")
# Step 4에서 정의한 Transformer 클래스 사용
transformer = Transformer(
    num_layers=NUM_LAYERS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dff=DFF,
    input_vocab_size=INPUT_VOCAB_SIZE,
    target_vocab_size=target_vocab_size,
    pe_input=MAX_LENGTH,  # 포지셔널 인코딩 최대 길이 (인코더)
    pe_target=MAX_LENGTH, # 포지셔널 인코딩 최대 길이 (디코더)
    rate=DROPOUT_RATE
)
print("모델 인스턴스 생성 완료.")


Transformer 모델 인스턴스 생성 중...
모델 인스턴스 생성 완료.


In [20]:
# 4. 손실 함수, 옵티마이저, 평가지표 정의

# 손실 함수 (패딩 마스킹 적용)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none') # 각 샘플별 손실 계산 후 평균내기 위해 reduction='none'

def loss_function(real, pred):
    # real: 실제 타겟값 (ID), shape=(batch, seq_len)
    # pred: 모델 예측값 (logits), shape=(batch, seq_len, vocab_size)
    mask = tf.math.logical_not(tf.math.equal(real, 0)) # 실제 타겟값이 0 (패딩)인 위치는 False
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype) # 마스크 타입을 손실값 타입과 일치시킴
    loss_ *= mask # 패딩 위치의 손실을 0으로 만듦

    return tf.reduce_sum(loss_)/tf.reduce_sum(mask) # 실제 토큰(패딩 제외) 개수로 나누어 평균 손실 계산

# 정확도 함수 (패딩 마스킹 적용)
def accuracy_function(real, pred):
    # 가장 높은 확률을 가진 예측 토큰 ID 추출
    accuracies = tf.equal(real, tf.cast(tf.argmax(pred, axis=2), dtype=real.dtype))

    mask = tf.math.logical_not(tf.math.equal(real, 0)) # 패딩 제외 마스크
    accuracies = tf.math.logical_and(mask, accuracies) # 패딩 아닌 위치 중 예측 맞은 것만 True

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    # 실제 토큰 수 대비 정확히 예측한 토큰 수의 비율 계산
    return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)


# 학습률 스케줄 정의 (논문 참고: Warmup + Decay)
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        # step이 warmup_steps보다 작으면 학습률 증가, 크면 감소
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# 옵티마이저 정의
learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

In [22]:
# --- 모델 학습 (맞춤형 학습 루프) ---
import time # 시간 측정을 위해 추가

# 학습 관련 상수 확인 (이전 셀에서 정의됨)
if 'EPOCHS' not in locals(): EPOCHS = 20 # 기본값 설정 (이전 셀 실행 권장)
if 'checkpoint_dir' not in locals(): checkpoint_dir = './transformer_checkpoints' # 기본값
# optimizer, loss_function, accuracy_function 등도 이전 셀에서 정의되어 있어야 함
if 'optimizer' not in locals() or 'loss_function' not in locals() or 'accuracy_function' not in locals():
    raise NameError("오류: optimizer, loss_function 또는 accuracy_function이 정의되지 않았습니다.")

print(f"맞춤형 학습 시작 (EPOCHS={EPOCHS})")

# 학습/검증 과정에서의 평균 손실/정확도 추적용 Metric 객체
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.Mean(name='train_accuracy') # 에폭 평균 계산 위해 Mean 사용
val_loss = tf.keras.metrics.Mean(name='val_loss')
val_accuracy = tf.keras.metrics.Mean(name='val_accuracy')

# 체크포인트 관리자 (더 안정적인 저장/로드)
# transformer와 optimizer 객체가 정의되어 있어야 함
if 'transformer' in locals() and transformer is not None:
    ckpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)
    ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_dir, max_to_keep=3) # 최근 3개 체크포인트 유지

    # 최신 체크포인트 복원 (학습 재개 시 유용)
    if ckpt_manager.latest_checkpoint:
        ckpt.restore(ckpt_manager.latest_checkpoint)
        print(f'성공: 최신 체크포인트를 복원했습니다! 경로: {ckpt_manager.latest_checkpoint}')
    else:
        print("정보: 저장된 체크포인트가 없습니다. 처음부터 학습을 시작합니다.")
else:
    print("경고: transformer 객체가 없어 체크포인트 관리를 건너뜁니다.")
    ckpt_manager = None # 체크포인트 관리 비활성화

# 학습 스텝 함수 (tf.function으로 컴파일하여 속도 향상)
# 이 함수들은 이전에 정의된 loss_function, accuracy_function, create_padding_mask, create_look_ahead_mask 사용
@tf.function
def train_step(inputs_dict, targets_dict):
    # 데이터셋에서 입력 분리
    enc_input = inputs_dict['inputs']  # 인코더 입력
    dec_input = inputs_dict['dec_inputs'] # 디코더 입력 (<SOS> 시작)
    targets = targets_dict['outputs']     # 실제 정답 (<EOS> 포함)

    # 마스크 생성
    enc_padding_mask = create_padding_mask(enc_input)
    look_ahead_mask = create_look_ahead_mask(tf.shape(dec_input)[1])
    dec_target_padding_mask = create_padding_mask(dec_input)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    dec_padding_mask = create_padding_mask(enc_input) # Enc-Dec 어텐션용 마스크

    # 그래디언트 계산을 위한 컨텍스트
    with tf.GradientTape() as tape:
        # 모델 정방향 실행 (training=True)
        predictions, _ = transformer(enc_input,
                                     dec_input,
                                     True, # 학습 중이므로 True
                                     enc_padding_mask,
                                     combined_mask,
                                     dec_padding_mask)
        # 손실 계산 (패딩 제외)
        loss = loss_function(targets, predictions)

    # 그래디언트 계산
    gradients = tape.gradient(loss, transformer.trainable_variables)
    # 옵티마이저로 가중치 업데이트
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    # 손실 및 정확도 기록 (에폭 평균 계산용)
    train_loss(loss)
    train_accuracy(accuracy_function(targets, predictions))

# 검증 스텝 함수 (tf.function으로 컴파일)
@tf.function
def val_step(inputs_dict, targets_dict):
    enc_input = inputs_dict['inputs']
    dec_input = inputs_dict['dec_inputs']
    targets = targets_dict['outputs']

    # 마스크 생성 (train_step과 동일)
    enc_padding_mask = create_padding_mask(enc_input)
    look_ahead_mask = create_look_ahead_mask(tf.shape(dec_input)[1])
    dec_target_padding_mask = create_padding_mask(dec_input)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    dec_padding_mask = create_padding_mask(enc_input)

    # 모델 예측 (training=False)
    predictions, _ = transformer(enc_input,
                                 dec_input,
                                 False, # 검증 중이므로 False
                                 enc_padding_mask,
                                 combined_mask,
                                 dec_padding_mask)
    # 손실 계산
    loss = loss_function(targets, predictions)

    # 손실 및 정확도 기록
    val_loss(loss)
    val_accuracy(accuracy_function(targets, predictions))


# --- 맞춤형 학습 루프 시작 ---
best_val_loss = float('inf') # 최고 검증 손실 기록용
patience_counter = 0         # 조기 종료 카운터
patience = 3                 # 조기 종료 기준 (예: 3 에폭 동안 개선 없을 시)

for epoch in range(EPOCHS):
    start_time = time.time() # 에폭 시작 시간 기록

    # 각 에폭 시작 시 Metric 초기화
    train_loss.reset_states()
    train_accuracy.reset_states()
    val_loss.reset_states()
    val_accuracy.reset_states()

    # 훈련 데이터셋 루프 (train_dataset은 이전 셀에서 생성됨)
    # train_dataset은 (inputs_dict, targets_dict) 형태의 배치를 반환
    for (batch, (inputs_dict, targets_dict)) in enumerate(train_dataset):
        train_step(inputs_dict, targets_dict) # 각 배치마다 학습 스텝 실행
        # 진행 상황 출력 (예: 100 배치마다)
        if batch % 100 == 0:
             print(f'Epoch {epoch + 1}/{EPOCHS} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')

    # 검증 데이터셋 루프 (val_dataset은 이전 셀에서 생성됨)
    for (batch, (inputs_dict, targets_dict)) in enumerate(val_dataset):
        val_step(inputs_dict, targets_dict) # 각 배치마다 검증 스텝 실행

    # 에폭 결과 출력
    print(f'\nEpoch {epoch + 1} 완료:')
    print(f'  훈련 손실: {train_loss.result():.4f} - 훈련 정확도: {train_accuracy.result():.4f}')
    print(f'  검증 손실: {val_loss.result():.4f} - 검증 정확도: {val_accuracy.result():.4f}')
    print(f'  소요 시간: {time.time() - start_time:.2f} 초')

    # 체크포인트 저장 및 조기 종료 확인
    current_val_loss = val_loss.result()
    if current_val_loss < best_val_loss:
        best_val_loss = current_val_loss
        if ckpt_manager: # 체크포인트 매니저가 있을 경우 저장
            ckpt_save_path = ckpt_manager.save()
            print(f'  개선된 검증 손실({best_val_loss:.4f}), 체크포인트 저장 완료: {ckpt_save_path}')
        else:
            print(f'  개선된 검증 손실({best_val_loss:.4f}), (체크포인트 매니저 없음)')
        patience_counter = 0 # 개선되었으므로 카운터 초기화
    else:
        patience_counter += 1
        print(f'  검증 손실 개선 없음 (카운터: {patience_counter}/{patience})')

    # 조기 종료 조건 확인
    if patience_counter >= patience:
        print(f'\n{patience} 에폭 동안 검증 손실이 개선되지 않아 학습을 조기 종료합니다!')
        break # 학습 루프 탈출

print("\n맞춤형 학습 루프 완료.")

# 학습 결과 시각화는 history 객체가 없으므로, 루프 내에서 값을 직접 저장하거나 TensorBoard 콜백을 사용해야 합니다.
# 여기서는 단순화를 위해 시각화 코드는 제외합니다.

맞춤형 학습 시작 (EPOCHS=20)
정보: 저장된 체크포인트가 없습니다. 처음부터 학습을 시작합니다.
Epoch 1/20 Batch 0 Loss 9.2229 Accuracy 0.0000
Epoch 1/20 Batch 100 Loss 8.8890 Accuracy 0.0988

Epoch 1 완료:
  훈련 손실: 8.6275 - 훈련 정확도: 0.1202
  검증 손실: 7.9167 - 검증 정확도: 0.1536
  소요 시간: 18.10 초
  개선된 검증 손실(7.9167), 체크포인트 저장 완료: ./transformer_checkpoints/ckpt-1
Epoch 2/20 Batch 0 Loss 7.9451 Accuracy 0.1582
Epoch 2/20 Batch 100 Loss 7.4793 Accuracy 0.2600

Epoch 2 완료:
  훈련 손실: 7.1759 - 훈련 정확도: 0.2733
  검증 손실: 6.4005 - 검증 정확도: 0.2997
  소요 시간: 9.07 초
  개선된 검증 손실(6.4005), 체크포인트 저장 완료: ./transformer_checkpoints/ckpt-2
Epoch 3/20 Batch 0 Loss 6.2519 Accuracy 0.3181
Epoch 3/20 Batch 100 Loss 6.1074 Accuracy 0.2968

Epoch 3 완료:
  훈련 손실: 5.9901 - 훈련 정확도: 0.2963
  검증 손실: 5.6926 - 검증 정확도: 0.3028
  소요 시간: 9.12 초
  개선된 검증 손실(5.6926), 체크포인트 저장 완료: ./transformer_checkpoints/ckpt-3
Epoch 4/20 Batch 0 Loss 5.6705 Accuracy 0.2927
Epoch 4/20 Batch 100 Loss 5.5287 Accuracy 0.3061

Epoch 4 완료:
  훈련 손실: 5.4773 - 훈련 정확도: 0.3100
  검증 손실: 5.3794 - 검증 정확도

# Step 5. 모델 평가하기

In [25]:
def evaluate(sentence):
    # 1. 입력 문장 전처리 (토큰화, 정수 인코딩, SOS/EOS 토큰 추가는 predict 함수에서 처리)
    # 여기서는 이미 인코딩된 시퀀스를 받는다고 가정하거나, predict 함수 내에서 처리

    # 2. 입력 시퀀스 생성 (SOS 토큰으로 시작)
    # 입력 문장은 인코더 입력으로 사용됨
    encoder_input = tf.expand_dims(sentence, 0) # 배치 차원 추가

    # 디코더 입력은 START_TOKEN으로 시작
    decoder_input = [START_TOKEN]
    output = tf.expand_dims(decoder_input, 0) # 배치 차원 추가

    # 3. 예측 루프 (최대 MAX_LENGTH까지 생성)
    for i in range(MAX_LENGTH):
        # 마스크 생성
        enc_padding_mask = create_padding_mask(encoder_input)
        # 디코더의 self-attention을 위한 마스크
        look_ahead_mask = create_look_ahead_mask(tf.shape(output)[1])
        # 디코더의 패딩 마스크 (현재까지 생성된 시퀀스 기준)
        dec_target_padding_mask = create_padding_mask(output)
        combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
        # 인코더-디코더 어텐션을 위한 패딩 마스크
        dec_padding_mask = create_padding_mask(encoder_input)

        # 모델 예측 (training=False)
        predictions, attention_weights = transformer(encoder_input,
                                                     output,
                                                     False, # 추론 모드
                                                     enc_padding_mask,
                                                     combined_mask,
                                                     dec_padding_mask)

        # 현재 스텝의 예측 결과에서 가장 확률 높은 토큰 선택 (마지막 토큰)
        predictions = predictions[:, -1:, :] # (batch_size, 1, vocab_size)
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

        # 만약 END_TOKEN이 예측되면 루프 종료
        if tf.equal(predicted_id, END_TOKEN):
            break

        # 예측된 ID를 디코더 입력(output)에 추가하여 다음 스텝의 입력으로 사용
        output = tf.concat([output, predicted_id], axis=-1)

    # 배치 차원 제거하고 최종 예측 시퀀스 반환 (START_TOKEN 제외)
    return tf.squeeze(output, axis=0), attention_weights

# --- 사용자 입력 처리 및 예측 실행 함수 ---
def predict(sentence):
    # 1. 입력 문장 전처리 (정제)
    sentence = clean_text(sentence) # 이전 단계에서 정의한 clean_text 함수 사용

    # 2. 토큰화 및 정수 인코딩
    # ★ 중요: 입력 문장도 최대 길이에 맞춰 패딩해야 함 (인코더 입력용)
    tokenized_sentence = tokenizer.encode(sentence)
    encoder_input = tf.keras.preprocessing.sequence.pad_sequences(
        [tokenized_sentence], maxlen=MAX_LENGTH, padding='post'
    )
    # pad_sequences는 2D 입력을 기대하므로 리스트로 감싸줌 -> 결과는 (1, MAX_LENGTH)

    # 3. evaluate 함수 호출하여 예측 수행
    # encoder_input[0]을 전달하여 (MAX_LENGTH,) 형태의 1D 텐서로 만듦
    predicted_sequence, _ = evaluate(encoder_input[0])

    # 4. 예측된 정수 시퀀스를 다시 텍스트로 디코딩
    # START_TOKEN과 END_TOKEN은 결과에서 제외
    predicted_sentence = tokenizer.decode(
        [i for i in predicted_sequence if i < START_TOKEN] # START/END 토큰 ID 이후의 값 필터링
    )

    print(f'입력: {sentence}')
    print(f'챗봇 응답: {predicted_sentence}')

    return predicted_sentence

# --- 모델 평가 실행 ---
# 몇 가지 예시 문장으로 챗봇 응답 생성 테스트
print("\n--- 모델 평가 시작 ---")
predict("영화 볼만한 거 추천해줘.")
print("-" * 20)
predict("오늘 날씨 어때?")
print("-" * 20)
predict("너무 피곤하다.")
print("-" * 20)
predict("배고픈데 뭐 먹을까?")
print("-" * 20)
predict("안녕하세요") # 학습 데이터에 없을 법한 인사
print("-" * 20)


--- 모델 평가 시작 ---
입력: 영화 볼만한 거 추천해줘.
챗봇 응답: 최신 영화가 좋을 것 같아요.
--------------------
입력: 오늘 날씨 어때?
챗봇 응답: 날씨 어플에 물어보세요.
--------------------
입력: 너무 피곤하다.
챗봇 응답: 아무래도 사생활이 적으니까요.
--------------------
입력: 배고픈데 뭐 먹을까?
챗봇 응답: 맛있는 거 드세요.
--------------------
입력: 안녕하세요
챗봇 응답: 안녕하세요.
--------------------
