### 1. 데이터 수집 및 한국어에 맞게 전처리

- 찾아보니 unicode 정규화 내용이있어 적용했다.
- 초반에 불러올 때 계속 오류가 나서 왜 데이터가 안불러와지나 했더니 ChatbotData 뒤에 공백문자 하나가 있었다...
- 데이터 이름을 직접 수정하는 것으로 해결

In [28]:
import os
import pandas as pd
import re
import unicodedata

# =============================================================================
# Step 1. 데이터 수집하기
# =============================================================================
data_path = os.path.expanduser("~/aiffel/transformer_chatbot/data/ChatbotData.csv")
try:
    df = pd.read_csv(data_path, encoding="utf-8")
    print("데이터 로드 완료.")
    print(df.head())
except Exception as e:
    print("데이터 로드 실패:", e)

# =============================================================================
# Step 2. 데이터 전처리하기 (한국어)
# =============================================================================
def preprocess_sentence_korean(sentence):
    # 1. Unicode 정규화 (자모 분리 문제 등을 방지)
    sentence = unicodedata.normalize('NFKC', str(sentence))
    # 2. 앞뒤 공백 제거 및 연속된 공백은 한 칸으로 치환
    sentence = sentence.strip()
    sentence = re.sub(r'\s+', ' ', sentence)
    return sentence

if 'Q' in df.columns and 'A' in df.columns:
    df['Q_processed'] = df['Q'].apply(preprocess_sentence_korean)
    df['A_processed'] = df['A'].apply(preprocess_sentence_korean)
    
    print("전처리된 데이터 샘플:")
    print(df[['Q_processed', 'A_processed']].head())
else:
    print("CSV 파일에 'Q'와 'A' 컬럼이 없습니다. 컬럼명을 확인해 주세요.")

데이터 로드 완료.
                 Q            A  label
0           12시 땡!   하루가 또 가네요.      0
1      1지망 학교 떨어졌어    위로해 드립니다.      0
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠.      0
4          PPL 심하네   눈살이 찌푸려지죠.      0
전처리된 데이터 샘플:
       Q_processed  A_processed
0           12시 땡!   하루가 또 가네요.
1      1지망 학교 떨어졌어    위로해 드립니다.
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠.
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠.
4          PPL 심하네   눈살이 찌푸려지죠.


### 2. SubwordTextEncdoer 이용하여 인코딩

In [35]:
# =============================================================================
# Step 3. SubwordTextEncoder 사용하기
# =============================================================================
# 질문과 답변을 리스트로 추출
questions = df['Q_processed'].tolist()
answers = df['A_processed'].tolist()

# 질문과 답변을 합쳐서 코퍼스 구성
corpus = questions + answers
print("SubwordTextEncoder를 통한 토크나이저 생성 중... (잠시 소요될 수 있습니다)")
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(corpus, target_vocab_size=2**13)
print("토크나이저 생성 완료!")
print("생성된 단어장의 크기:", tokenizer.vocab_size)

# 시작 토큰과 종료 토큰 지정
START_TOKEN = [tokenizer.vocab_size]
END_TOKEN = [tokenizer.vocab_size + 1]

# 단어장 크기 계산 (토크나이저 단어 수 + 시작/종료 토큰)
VOCAB_SIZE = tokenizer.vocab_size + 2
print("VOCAB_SIZE:", VOCAB_SIZE)

# 최대 문장 길이 설정
MAX_LENGTH = 40
print("MAX_LENGTH:", MAX_LENGTH)

# 정수 인코딩 및 필터링 함수 정의
def tokenize_and_filter(inputs, outputs):
    tokenized_inputs, tokenized_outputs = [], []
    for (sentence1, sentence2) in zip(inputs, outputs):
        # 정수 인코딩 시 시작, 종료 토큰 추가
        sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
        sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
        
        # 최대 길이 이하인 샘플만 사용
        if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
            tokenized_inputs.append(sentence1)
            tokenized_outputs.append(sentence2)
    # 패딩 처리 (최대 길이 MAX_LENGTH)
    tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
    tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
    return tokenized_inputs, tokenized_outputs

questions, answers = tokenize_and_filter(questions, answers)
print("정수 인코딩 후 질문 샘플 개수:", len(questions))
print("정수 인코딩 후 답변 샘플 개수:", len(answers))

SubwordTextEncoder를 통한 토크나이저 생성 중... (잠시 소요될 수 있습니다)
토크나이저 생성 완료!
생성된 단어장의 크기: 8170
VOCAB_SIZE: 8172
MAX_LENGTH: 40
정수 인코딩 후 질문 샘플 개수: 11823
정수 인코딩 후 답변 샘플 개수: 11823


### 3. Transformer 정의

In [30]:
# =============================================================================
# Step 4. Transformer 모델 구성하기
# =============================================================================
# 1. 포지셔널 인코딩 레이어
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)
    
    def get_angles(self, position, i, d_model):
        angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return position * angles
    
    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
            i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
            d_model=d_model)
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.stack([sines, cosines], axis=0)
        pos_encoding = tf.transpose(pos_encoding, [1, 2, 0])
        pos_encoding = tf.reshape(pos_encoding, [position, d_model])
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, tf.float32)
    
    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

# 2. 스케일드 닷 프로덕트 어텐션 함수
def scaled_dot_product_attention(query, key, value, mask):
    matmul_qk = tf.matmul(query, key, transpose_b=True)
    depth = tf.cast(tf.shape(key)[-1], tf.float32)
    logits = matmul_qk / tf.math.sqrt(depth)
    if mask is not None:
        logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(logits, axis=-1)
    output = tf.matmul(attention_weights, value)
    return output

# 3. 멀티헤드 어텐션 레이어
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, name="multi_head_attention"):
        super(MultiHeadAttention, self).__init__(name=name)
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % self.num_heads == 0
        self.depth = d_model // self.num_heads
        self.query_dense = tf.keras.layers.Dense(units=d_model)
        self.key_dense   = tf.keras.layers.Dense(units=d_model)
        self.value_dense = tf.keras.layers.Dense(units=d_model)
        self.dense = tf.keras.layers.Dense(units=d_model)
    
    def split_heads(self, inputs, batch_size):
        inputs = tf.reshape(inputs, shape=(batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(inputs, perm=[0, 2, 1, 3])
    
    def call(self, inputs):
        query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
        batch_size = tf.shape(query)[0]
        query = self.query_dense(query)
        key   = self.key_dense(key)
        value = self.value_dense(value)
        query = self.split_heads(query, batch_size)
        key   = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)
        scaled_attention = scaled_dot_product_attention(query, key, value, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        outputs = self.dense(concat_attention)
        return outputs

# 4. 패딩 마스크 및 룩어헤드 마스크 함수
def create_padding_mask(x):
    mask = tf.cast(tf.math.equal(x, 0), tf.float32)
    return mask[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(x):
    seq_len = tf.shape(x)[1]
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
    padding_mask = create_padding_mask(x)
    return tf.maximum(look_ahead_mask, padding_mask)

# 5. 인코더 레이어
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
    inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
    attention = MultiHeadAttention(d_model, num_heads, name="attention")({
        'query': inputs,
        'key': inputs,
        'value': inputs,
        'mask': padding_mask
    })
    attention = tf.keras.layers.Dropout(rate=dropout)(attention)
    attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)
    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)
    return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)

# 6. 인코더 (여러 인코더 레이어를 쌓음)
def encoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="encoder"):
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
    for i in range(num_layers):
        outputs = encoder_layer(units=units,
                                d_model=d_model,
                                num_heads=num_heads,
                                dropout=dropout,
                                name="encoder_layer_{}".format(i)
                               )([outputs, padding_mask])
    return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)

# 7. 디코더 레이어
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
    inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
    enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
    attention1 = MultiHeadAttention(d_model, num_heads, name="attention_1")(inputs={
        'query': inputs,
        'key': inputs,
        'value': inputs,
        'mask': look_ahead_mask
    })
    attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)
    attention2 = MultiHeadAttention(d_model, num_heads, name="attention_2")(inputs={
        'query': attention1,
        'key': enc_outputs,
        'value': enc_outputs,
        'mask': padding_mask
    })
    attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
    attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)
    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)
    return tf.keras.Model(
        inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
        outputs=outputs,
        name=name
    )

# 8. 디코더 (여러 디코더 레이어를 쌓음)
def decoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name='decoder'):
    inputs = tf.keras.Input(shape=(None,), name='inputs')
    enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')
    padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
    for i in range(num_layers):
        outputs = decoder_layer(units=units,
                                d_model=d_model,
                                num_heads=num_heads,
                                dropout=dropout,
                                name='decoder_layer_{}'.format(i)
                               )([outputs, enc_outputs, look_ahead_mask, padding_mask])
    return tf.keras.Model(
        inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
        outputs=outputs,
        name=name
    )

# 9. 트랜스포머 모델 (인코더와 디코더 결합)
def transformer(vocab_size, num_layers, units, d_model, num_heads, dropout, name="transformer"):
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
    enc_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None),
                                                name='enc_padding_mask')(inputs)
    look_ahead_mask = tf.keras.layers.Lambda(create_look_ahead_mask, output_shape=(1, None, None),
                                             name='look_ahead_mask')(dec_inputs)
    dec_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None),
                                              name='dec_padding_mask')(inputs)
    enc_outputs = encoder(vocab_size=vocab_size,
                          num_layers=num_layers,
                          units=units,
                          d_model=d_model,
                          num_heads=num_heads,
                          dropout=dropout)([inputs, enc_padding_mask])
    dec_outputs = decoder(vocab_size=vocab_size,
                          num_layers=num_layers,
                          units=units,
                          d_model=d_model,
                          num_heads=num_heads,
                          dropout=dropout)([dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
    outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
    return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

# 하이퍼파라미터 설정
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1

tf.keras.backend.clear_session()
model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT
)
model.summary()

Model: "transformer"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
inputs (InputLayer)             [(None, None)]       0                                            
__________________________________________________________________________________________________
dec_inputs (InputLayer)         [(None, None)]       0                                            
__________________________________________________________________________________________________
enc_padding_mask (Lambda)       (None, 1, 1, None)   0           inputs[0][0]                     
__________________________________________________________________________________________________
encoder (Functional)            (None, None, 256)    3146240     inputs[0][0]                     
                                                                 enc_padding_mask[0][0] 

### 4. 모델 훈련 및 컴파일

In [31]:
# =============================================================================
# Step 5. 모델 훈련하기
# =============================================================================
# 손실 함수
def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')(y_true, y_pred)
    mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
    loss = loss * mask
    return tf.reduce_mean(loss)

# 커스텀 학습률 스케줄러
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps
    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(learning_rate,
                                     beta_1=0.9,
                                     beta_2=0.98,
                                     epsilon=1e-9)

def accuracy(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
    return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
print("모델 컴파일 완료.")

# teacher forcing을 위한 데이터셋 구성
# questions와 answers는 이미 정수 인코딩 및 패딩되어 있음
# 리스트의 각 요소는 시퀀스(리스트) 형태이므로 numpy 배열로 변환합니다.
questions = np.array([np.array(x) for x in questions])
answers   = np.array([np.array(x) for x in answers])

BATCH_SIZE = 64
BUFFER_SIZE = 20000

dataset = tf.data.Dataset.from_tensor_slices((
    {
        'inputs': questions,
        'dec_inputs': answers[:, :-1]  # 마지막 토큰 제외
    },
    {
        'outputs': answers[:, 1:]      # 시작 토큰 제외
    }
))

dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

print("데이터셋 준비 완료!")

EPOCHS = 10
history = model.fit(dataset, epochs=EPOCHS, verbose=1)
print("훈련 완료!")

모델 컴파일 완료.
데이터셋 준비 완료!
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
훈련 완료!


### 5. 문장 생성 함수 정의 후, 실제로 사용

In [37]:
def decoder_inference(sentence):
    # Step 1에서 사용한 전처리 방법을 적용합니다.
    sentence = preprocess_sentence(sentence)
    
    # 입력 문장을 정수 인코딩하고, 시작 및 종료 토큰을 추가합니다.
    # 예시: "안녕하세요?" → [START_TOKEN, ..., END_TOKEN]
    sentence = tf.expand_dims(START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
    
    # 디코더 입력 초기값은 시작 토큰입니다.
    output_sequence = tf.expand_dims(START_TOKEN, 0)
    
    # 최대 MAX_LENGTH 길이까지 디코더를 통해 다음 토큰을 예측합니다.
    for i in range(MAX_LENGTH):
        # 모델은 encoder 입력(sentence)와 현재까지의 디코더 입력(output_sequence)를 받아 다음 토큰 예측
        predictions = model(inputs=[sentence, output_sequence], training=False)
        # 마지막 시간 스텝의 예측 결과만 추출
        predictions = predictions[:, -1:, :]
        # 가장 높은 확률을 가진 토큰 인덱스 선택
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
        
        # 만약 예측한 토큰이 종료 토큰이면 반복 중단
        if tf.equal(predicted_id, END_TOKEN[0]):
            break
        
        # 예측한 토큰을 디코더 입력에 추가하여 다음 예측에 반영
        output_sequence = tf.concat([output_sequence, predicted_id], axis=-1)
    
    # 예측된 시퀀스를 1차원 텐서로 반환합니다.
    return tf.squeeze(output_sequence, axis=0)


def sentence_generation(sentence):
    # 입력 문장에 대해 디코더 인퍼런스를 진행합니다.
    prediction = decoder_inference(sentence)
    # 예측된 정수 시퀀스를 다시 텍스트로 디코딩합니다.
    # 토크나이저 단어장 크기보다 작은 토큰만 선택하여 디코딩합니다.
    predicted_sentence = tokenizer.decode([i for i in prediction.numpy() if i < tokenizer.vocab_size])
    
    print('입력 : {}'.format(sentence))
    print('출력 : {}'.format(predicted_sentence))
    return predicted_sentence

# 사용 예시:
sentence_generation("당신은 누구세요?")

입력 : 당신은 누구세요?
출력 : 저는 위로봇입니다.


'저는 위로봇입니다.'

### 회고

- 트랜스포머를 직접 하나하나 코드로 짜본 것이 처음이라 굉장히 어려워서 lms를 많이 참고했다.
- 가장 중요한 모델을 하나하나 구현하며 이해도가 올라가는 과정이 즐거웠고, 더 깊게 공부하고 싶어졌다.